聊聊kingbus的command.go
序
本文主要研究一下kingbus的command.go
Close
kingbus/mysql/command.go
//Close the Connfunc (c *Conn) Close() {
if c.closed.Load() == true {
return
}
c.closed.Store(true)
c.Conn.Close()
c.cancel()
c.Conn = nil
}
- Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()
handleQuery
kingbus/mysql/command.go
func (c *Conn) handleQuery(sql string) (err error) { defer func() {
if e := recover(); e != nil {
if myerr, ok := e.(error); ok {
const size = 4096
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf))
err = myerr
}
return
}
}()
log.Log.Infof("handleQuery sql:%s", sql)
sqlParser := parser.New()
stmt, err := sqlParser.ParseOneStmt(sql, "", "")
if err != nil {
return err
}
switch v := stmt.(type) {
case *ast.ShowStmt:
return c.handleShow(v)
case *ast.SelectStmt:
return c.handleSelect(v)
case *ast.SetStmt:
return c.handleSet(v)
case *ast.KillStmt:
return c.handleKill(v)
//todo get metrics of slave
default:
return ErrSQLNotSupport
}
return
}
- handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)
writeOK
kingbus/mysql/resp.go
func (c *Conn) writeOK(r *gomysql.Result) error { if r == nil {
r = &gomysql.Result{}
}
r.Status |= c.status
data := make([]byte, 4, 32)
data = append(data, gomysql.OK_HEADER)
data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...)
data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...)
if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 {
data = append(data, byte(r.Status), byte(r.Status>>8))
data = append(data, 0, 0)
}
return c.WritePacket(data)
}
- writeOK方法用于响应ping命令
handleBinlogDumpGtid
kingbus/mysql/command.go
//todo kill the slave with same uuidfunc (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error {
var (
err error
heartbeatPeriod time.Duration
)
slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data)
if err != nil {
return err
}
//UnregisterSlave
slaveUUID := c.userVariables[SlaveUUID].(string)
defer c.binlogServer.UnregisterSlave(slaveUUID)
err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted)
if err != nil {
log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
return err
}
//get the previousGtidEvent raft index
preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted)
if err != nil {
log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
return err
}
fde, err := c.binlogServer.GetFde(preGtidEventIndex)
if err != nil {
log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s",
err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
return err
}
//1.send fake rotate event
masterServerID := binary.LittleEndian.Uint32(fde[5:])
fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex)
if err != nil {
log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s",
err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
return err
}
err = c.sendFakeRotateEvent(masterServerID, fileName)
if err != nil {
log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s",
err, masterServerID, fileName)
return err
}
//2.send fde
err = c.sendFormatDescriptionEvent(fde)
if err != nil {
log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v",
err, fde)
return err
}
//3.send event
eventC := make(chan *storagepb.BinlogEvent, 2000)
errorC := make(chan error, 1)
err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC)
if err != nil {
log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v",
err, preGtidEventIndex, slaveGtidExecuted)
return err
}
//4.new metrics
slaveEps := metrics.NewMeter()
slaveThroughput := metrics.NewMeter()
metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps)
metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput)
if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok {
heartbeatPeriod = time.Duration(period.(int64))
} else {
heartbeatPeriod = MaxHeartbeatPeriod
}
timer := time.NewTimer(heartbeatPeriod)
for {
select {
case event := <-eventC:
//event is not divided or the first divided event
//WriteEvent need write a ok_header(one byte),after the header size
if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) {
err = c.WriteEvent(event.Data, true)
if err != nil {
log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
return err
}
} else {
err = c.WriteEvent(event.Data, false)
if err != nil {
log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
return err
}
//event is divided,and the last packet size is MaxPayloadLen
//need send a empty packet
//https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen {
err = c.WriteEvent(nil, false)
if err != nil {
log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
return err
}
}
}
slaveEps.Mark(1)
slaveThroughput.Mark(int64(len(event.Data)))
//reset heartbeat period
resetTime(timer, heartbeatPeriod)
case err = <-errorC:
log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err)
return err
case <-ctx.Done():
log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit")
return ctx.Err()
case <-timer.C:
//kingbus send the heartbeat log event which received by syncer to slave
log.Log.Debugf("send a heartbeat log event to slave")
err = c.sendHeartbeatEvent(masterServerID)
if err != nil {
return err
}
//reset heartbeat period
resetTime(timer, heartbeatPeriod)
}
}
return nil
}
- handleBinlogDumpGtid方法先执行c.sendFakeRotateEvent,再执行c.sendFormatDescriptionEvent,接着执行c.binlogServer.DumpBinlogAt,然后通过select来响应相应事件
handleRegisterSlave
kingbus/mysql/command.go
func (c *Conn) handleRegisterSlave(data []byte) error { var s Slave
pos := 0
s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:]))
pos += 4
hostNameLen := int(data[pos])
pos++
s.HostName = string(data[pos : pos+hostNameLen])
pos += hostNameLen
userLen := int(data[pos])
pos++
s.User = string(data[pos : pos+userLen])
pos += userLen
passwordLen := int(data[pos])
pos++
s.Password = string(data[pos : pos+passwordLen])
pos += passwordLen
s.Port = int16(binary.LittleEndian.Uint16(data[pos:]))
pos += 2
s.Rank = binary.LittleEndian.Uint32(data[pos:])
pos += 4
s.MasterID = binary.LittleEndian.Uint32(data[pos:])
s.State = REGISTERED
//kill the zombie dump thread with same uuid
c.killZombieDumpThreads()
if uuid, ok := c.userVariables[SlaveUUID]; ok {
s.UUID = uuid.(string)
} else {
s.UUID = ""
}
s.ConnectTime = time.Now()
s.Conn = c
err := c.binlogServer.RegisterSlave(&s)
if err != nil {
return err
}
log.Log.Infof("handleRegisterSlave:slave info:%v", s)
return c.writeOK(nil)
}
- handleRegisterSlave方法用于处理COM_REGISTER_SLAVE命令,这里先执行c.killZombieDumpThreads(),然后执行c.binlogServer.RegisterSlave(&s)
小结
kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法
doc
- command
以上是 聊聊kingbus的command.go 的全部内容, 来源链接: utcz.com/z/517683.html