聊聊kingbus的startMasterServer

编程

本文主要研究一下kingbus的startMasterServer

startMasterServer

kingbus/server/server.go

func (s *KingbusServer) startMasterServer(args *config.BinlogServerConfig) error {

master, err := NewBinlogServer(args, s, s.store, s.applyBroadcast)

if err != nil {

log.Log.Errorf("NewBinlogServer error,err:%s,args:%v", err, *args)

return err

}

s.master = master

s.master.Start()

log.Log.Infof("startMasterServer success,args:%v", *args)

return nil

}

  • startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法

NewBinlogServer

kingbus/server/binlog_server.go

//NewBinlogServer create a binlog server

func NewBinlogServer(cfg *config.BinlogServerConfig, ki KingbusInfo, store storage.Storage, broadcast *utils.Broadcast) (*BinlogServer, error) {

var err error

s := new(BinlogServer)

s.started = atomic.NewBool(false)

s.cfg = cfg

s.listener, err = net.Listen("tcp", s.cfg.Addr)

if err != nil {

log.Log.Errorf("Listen error,err:%s,addr:%s", err, s.cfg.Addr)

return nil, err

}

s.store = store

s.broadcast = broadcast

s.kingbusInfo = ki

s.slaves = make(map[string]*mysql.Slave)

s.errch = make(chan error, 1)

return s, nil

}

  • NewBinlogServer方法通过new方法创建BinlogServer,之后设置其listener、store、broadcast等属性

BinlogServer

kingbus/server/binlog_server.go

//BinlogServer is a binlog server,send binlog event to slave.

//The generic process:

//1.authentication

//SHOW GLOBAL VARIABLES LIKE "BINLOG_CHECKSUM"

//SET @master_binlog_checksum="NONE"

//SET @master_heartbeat_period=%d

//2.COM_REGISTER_SLAVE

//3.semi-sync:

//SHOW VARIABLES LIKE "rpl_semi_sync_master_enabled";

//SET @rpl_semi_sync_slave = 1

//4.COM_BINLOG_DUMP_GTID

type BinlogServer struct {

started *atomic.Bool

cfg *config.BinlogServerConfig

listener net.Listener

errch chan error

l sync.RWMutex

slaves map[string]*mysql.Slave //key is uuid

broadcast *utils.Broadcast

kingbusInfo KingbusInfo

store storage.Storage

}

  • BinlogServer定义了Bool、BinlogServerConfig、Listener、Slave、Broadcast、KingbusInfo、Storage属性

Start

kingbus/server/binlog_server.go

//Start implements binlog server start

func (s *BinlogServer) Start() {

s.started.Store(true)

go func() {

for s.started.Load() {

select {

case err := <-s.errch:

log.Log.Errorf("binlog server Run error,err:%s", err)

s.Stop()

return

default:

conn, err := s.listener.Accept()

if err != nil {

log.Log.Infof("BinlogServer.Start:Accept error,err:%s", err)

continue

}

go s.onConn(conn)

}

}

}()

}

  • Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn)

onConn

kingbus/server/binlog_server.go

func (s *BinlogServer) onConn(c net.Conn) {

mysqlConn, err := mysql.NewConn(c, s, s.cfg.User, s.cfg.Password)

if err != nil {

log.Log.Errorf("onConn error,err:%s", err)

return

}

mysqlConn.Run()

}

  • onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

NewConn

kingbus/mysql/conn.go

//NewConn create a Conn

func NewConn(conn net.Conn, s BinlogServer, user string, password string) (*Conn, error) {

c := new(Conn)

c.user = user

c.BaseConn = NewBaseConn(conn)

c.connectionID = baseConnID.Add(1)

c.salt, _ = gomysql.RandomBuf(20)

c.closed = atomic.NewBool(false)

masterInfo, err := s.GetMasterInfo()

if err != nil {

c.BaseConn.Close()

log.Log.Errorf("NewConn:GetMasterInfo error,err:%s", err)

return nil, err

}

err = c.handshake(masterInfo.Version, password)

if err != nil {

c.BaseConn.Close()

log.Log.Errorf("NewConn:handshake error,err:%s", err)

return nil, err

}

c.ctx, c.cancel = context.WithCancel(context.Background())

c.userVariables = make(map[string]interface{})

c.binlogServer = s

return c, nil

}

  • NewConn方法通过s.GetMasterInfo()获取master信息,然后执行c.handshake(masterInfo.Version, password)

handshake

kingbus/mysql/conn.go

//handshake implements the handshake protocol in mysql

func (c *Conn) handshake(serverVersion, password string) error {

if err := c.writeInitialHandshake(serverVersion); err != nil {

return err

}

if err := c.readHandshakeResponse(password); err != nil {

c.writeError(err)

return err

}

if err := c.writeOK(nil); err != nil {

return err

}

c.ResetSequence()

return nil

}

  • handshake方法实现的是mysql的handshake协议

Run

kingbus/mysql/conn.go

//Run implements handle client request in Conn

func (c *Conn) Run() {

defer func() {

r := recover()

if err, ok := r.(error); ok {

const size = 4096

buf := make([]byte, size)

buf = buf[:runtime.Stack(buf, false)]

log.Log.Errorf("Conn Run error,err:%s,stack:%s", err, string(buf))

}

c.Close()

log.Log.Debugf("close client connection")

}()

for {

select {

case <-c.ctx.Done():

log.Log.Debugf("BinlogServer closed, close connection")

return

default:

data, err := c.ReadPacket()

if err != nil {

log.Log.Errorf("ReadPacket error,err:%s", err)

return

}

if err := c.dispatch(c.ctx, data); err != nil {

log.Log.Errorf("dispatch error,err:%s,data:%v", err.Error(), data)

//if the error is canceled, means the connection was killed by cmd

//don"t need send error message

if err != context.Canceled && c.closed.Load() == false {

c.writeError(err)

}

return

}

//if the connection is closed, return from loop

if c.closed.Load() == true {

log.Log.Infof("connection status is closed,need return")

return

}

c.Sequence = 0

}

}

}

-Run方法接收请求,然后通过c.dispatch(c.ctx, data)进行分发

dispatch

kingbus/mysql/command.go

func (c *Conn) dispatch(ctx context.Context, data []byte) error {

cmd := data[0]

data = data[1:]

switch cmd {

case gomysql.COM_QUIT:

c.Close()

log.Log.Debugf("close client connection")

return nil

case gomysql.COM_QUERY:

return c.handleQuery(utils.BytesToString(data))

case gomysql.COM_PING:

return c.writeOK(nil)

case gomysql.COM_BINLOG_DUMP_GTID:

return c.handleBinlogDumpGtid(ctx, data)

case gomysql.COM_REGISTER_SLAVE:

return c.handleRegisterSlave(data)

default:

log.Log.Errorf("master not support this cmd:%v", data)

return c.writeError(ErrSQLNotSupport)

}

return nil

}

  • dispatch根据不同的cmd来执行conn的不同方法

小结

startMasterServer方法先执行NewBinlogServer创建master,然后执行master的Start方法;master的Start方法先执行s.started.Store(true),然后通过select机制监听s.listener.Accept(),之后执行s.onConn(conn);onConn方法通过mysql.NewConn创建mysqlConn,然后执行mysqlConn.Run方法

doc

  • server

以上是 聊聊kingbus的startMasterServer 的全部内容, 来源链接: utcz.com/z/517633.html

回到顶部