聊聊kingbus的command.go

编程

本文主要研究一下kingbus的command.go

Close

kingbus/mysql/command.go

//Close the Conn

func (c *Conn) Close() {

if c.closed.Load() == true {

return

}

c.closed.Store(true)

c.Conn.Close()

c.cancel()

c.Conn = nil

}

  • Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()

handleQuery

kingbus/mysql/command.go

func (c *Conn) handleQuery(sql string) (err error) {

defer func() {

if e := recover(); e != nil {

if myerr, ok := e.(error); ok {

const size = 4096

buf := make([]byte, size)

buf = buf[:runtime.Stack(buf, false)]

log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf))

err = myerr

}

return

}

}()

log.Log.Infof("handleQuery sql:%s", sql)

sqlParser := parser.New()

stmt, err := sqlParser.ParseOneStmt(sql, "", "")

if err != nil {

return err

}

switch v := stmt.(type) {

case *ast.ShowStmt:

return c.handleShow(v)

case *ast.SelectStmt:

return c.handleSelect(v)

case *ast.SetStmt:

return c.handleSet(v)

case *ast.KillStmt:

return c.handleKill(v)

//todo get metrics of slave

default:

return ErrSQLNotSupport

}

return

}

  • handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)

writeOK

kingbus/mysql/resp.go

func (c *Conn) writeOK(r *gomysql.Result) error {

if r == nil {

r = &gomysql.Result{}

}

r.Status |= c.status

data := make([]byte, 4, 32)

data = append(data, gomysql.OK_HEADER)

data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...)

data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...)

if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 {

data = append(data, byte(r.Status), byte(r.Status>>8))

data = append(data, 0, 0)

}

return c.WritePacket(data)

}

  • writeOK方法用于响应ping命令

handleBinlogDumpGtid

kingbus/mysql/command.go

//todo kill the slave with same uuid

func (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error {

var (

err error

heartbeatPeriod time.Duration

)

slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data)

if err != nil {

return err

}

//UnregisterSlave

slaveUUID := c.userVariables[SlaveUUID].(string)

defer c.binlogServer.UnregisterSlave(slaveUUID)

err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted)

if err != nil {

log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)

return err

}

//get the previousGtidEvent raft index

preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted)

if err != nil {

log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)

return err

}

fde, err := c.binlogServer.GetFde(preGtidEventIndex)

if err != nil {

log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s",

err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)

return err

}

//1.send fake rotate event

masterServerID := binary.LittleEndian.Uint32(fde[5:])

fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex)

if err != nil {

log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s",

err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)

return err

}

err = c.sendFakeRotateEvent(masterServerID, fileName)

if err != nil {

log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s",

err, masterServerID, fileName)

return err

}

//2.send fde

err = c.sendFormatDescriptionEvent(fde)

if err != nil {

log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v",

err, fde)

return err

}

//3.send event

eventC := make(chan *storagepb.BinlogEvent, 2000)

errorC := make(chan error, 1)

err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC)

if err != nil {

log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v",

err, preGtidEventIndex, slaveGtidExecuted)

return err

}

//4.new metrics

slaveEps := metrics.NewMeter()

slaveThroughput := metrics.NewMeter()

metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps)

metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput)

if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok {

heartbeatPeriod = time.Duration(period.(int64))

} else {

heartbeatPeriod = MaxHeartbeatPeriod

}

timer := time.NewTimer(heartbeatPeriod)

for {

select {

case event := <-eventC:

//event is not divided or the first divided event

//WriteEvent need write a ok_header(one byte),after the header size

if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) {

err = c.WriteEvent(event.Data, true)

if err != nil {

log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)

return err

}

} else {

err = c.WriteEvent(event.Data, false)

if err != nil {

log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)

return err

}

//event is divided,and the last packet size is MaxPayloadLen

//need send a empty packet

//https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html

if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen {

err = c.WriteEvent(nil, false)

if err != nil {

log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)

return err

}

}

}

slaveEps.Mark(1)

slaveThroughput.Mark(int64(len(event.Data)))

//reset heartbeat period

resetTime(timer, heartbeatPeriod)

case err = <-errorC:

log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err)

return err

case <-ctx.Done():

log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit")

return ctx.Err()

case <-timer.C:

//kingbus send the heartbeat log event which received by syncer to slave

log.Log.Debugf("send a heartbeat log event to slave")

err = c.sendHeartbeatEvent(masterServerID)

if err != nil {

return err

}

//reset heartbeat period

resetTime(timer, heartbeatPeriod)

}

}

return nil

}

  • handleBinlogDumpGtid方法先执行c.sendFakeRotateEvent,再执行c.sendFormatDescriptionEvent,接着执行c.binlogServer.DumpBinlogAt,然后通过select来响应相应事件

handleRegisterSlave

kingbus/mysql/command.go

func (c *Conn) handleRegisterSlave(data []byte) error {

var s Slave

pos := 0

s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:]))

pos += 4

hostNameLen := int(data[pos])

pos++

s.HostName = string(data[pos : pos+hostNameLen])

pos += hostNameLen

userLen := int(data[pos])

pos++

s.User = string(data[pos : pos+userLen])

pos += userLen

passwordLen := int(data[pos])

pos++

s.Password = string(data[pos : pos+passwordLen])

pos += passwordLen

s.Port = int16(binary.LittleEndian.Uint16(data[pos:]))

pos += 2

s.Rank = binary.LittleEndian.Uint32(data[pos:])

pos += 4

s.MasterID = binary.LittleEndian.Uint32(data[pos:])

s.State = REGISTERED

//kill the zombie dump thread with same uuid

c.killZombieDumpThreads()

if uuid, ok := c.userVariables[SlaveUUID]; ok {

s.UUID = uuid.(string)

} else {

s.UUID = ""

}

s.ConnectTime = time.Now()

s.Conn = c

err := c.binlogServer.RegisterSlave(&s)

if err != nil {

return err

}

log.Log.Infof("handleRegisterSlave:slave info:%v", s)

return c.writeOK(nil)

}

  • handleRegisterSlave方法用于处理COM_REGISTER_SLAVE命令,这里先执行c.killZombieDumpThreads(),然后执行c.binlogServer.RegisterSlave(&s)

小结

kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法

doc

  • command

以上是 聊聊kingbus的command.go 的全部内容, 来源链接: utcz.com/z/517683.html

回到顶部