聊聊kingbus的binlog_server_handler.go

编程

本文主要研究一下kingbus的binlog_server_handler.go

StartBinlogServer

kingbus/api/binlog_server_handler.go

//StartBinlogServer implements start a binlog server

func (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

var args config.BinlogServerConfig

var err error

defer func() {

if err != nil {

log.Log.Errorf("StartBinlogServer error,err: %s", err)

echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

}()

err = echoCtx.Bind(&args)

if err != nil {

return err

}

kingbusIP := h.svr.GetIP()

//check args

err = args.Check(kingbusIP)

if err != nil {

return err

}

//start syncer server

err = h.svr.StartServer(config.BinlogServerType, &args)

if err != nil {

log.Log.Errorf("start server error,err:%s,args:%v", err, args)

return err

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))

}

  • StartBinlogServer方法主要是通过h.svr.StartServer(config.BinlogServerType, &args)来启动binlog server

StartServer

kingbus/server/server.go

//StartServer start sub servers:syncer server or binlog master server

func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error {

var err error

switch svrType {

case config.SyncerServerType:

if s.IsSyncerStarted() {

return ErrStarted

}

syncerArgs, ok := args.(*config.SyncerArgs)

if !ok {

log.Log.Errorf("StartServer args is illegal,args:%v", args)

return ErrArgs

}

err = s.startSyncerServer(syncerArgs)

if err != nil {

log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs)

return ErrArgs

}

//start to propose binlog event to raft cluster

s.StartProposeBinlog(s.syncer.ctx)

log.Log.Debugf("start syncer,and propose!!!")

return nil

case config.BinlogServerType:

if s.IsBinlogServerStarted() {

return ErrStarted

}

masterArgs, ok := args.(*config.BinlogServerConfig)

if !ok {

log.Log.Errorf("StartServer args is illegal,args:%v", args)

return ErrArgs

}

err = s.startMasterServer(masterArgs)

if err != nil {

log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs)

return ErrArgs

}

return nil

default:

log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType)

}

return nil

}

  • StartServer方法根据svrType参数来启动不同的server,若是config.SyncerServerType则启动的是SyncerServer,若是config.BinlogServerType则启动的是BinlogServer

StopBinlogServer

kingbus/api/binlog_server_handler.go

//StopBinlogServer implements stop binlog server

func (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

h.svr.StopServer(config.BinlogServerType)

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))

}

  • StopBinlogServer方法通过h.svr.StopServer(config.BinlogServerType)来关闭binlog server

StopServer

kingbus/server/server.go

//StopServer stop sub server

func (s *KingbusServer) StopServer(svrType config.SubServerType) {

switch svrType {

case config.SyncerServerType:

if s.IsSyncerStarted() {

s.syncer.Stop()

}

case config.BinlogServerType:

if s.IsBinlogServerStarted() {

s.master.Stop()

}

default:

log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)

}

}

  • StopServer方法根据svrType参数来关闭不同的server,若是config.SyncerServerType则关闭的是SyncerServer,若是config.BinlogServerType则关闭的是BinlogServer

GetBinlogServerStatus

kingbus/api/binlog_server_handler.go

//GetBinlogServerStatus implements get binlog server status in the runtime state

func (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

status := h.svr.GetServerStatus(config.BinlogServerType)

if masterStatus, ok := status.(*config.BinlogServerStatus); ok {

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus))

}

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))

}

  • GetBinlogServerStatus方法通过h.svr.GetServerStatus(config.BinlogServerType)来获取server status

GetServerStatus

//GetServerStatus get the sub server status

func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} {

switch svrType {

case config.SyncerServerType:

var syncerStatus config.SyncerStatus

if s.IsSyncerStarted() {

cfg := s.syncer.cfg

syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)

syncerStatus.MysqlUser = cfg.User

syncerStatus.MysqlPassword = cfg.Password

syncerStatus.SemiSync = cfg.SemiSyncEnabled

syncerStatus.Status = config.ServerRunningStatus

syncerStatus.CurrentGtid = s.CurrentGtidStr()

syncerStatus.LastBinlogFile = s.LastBinlogFile()

syncerStatus.LastFilePosition = s.LastFilePosition()

syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr()

purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)

if err != nil {

log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)

}

syncerStatus.PurgedGtidSet = purgedGtids.String()

} else {

syncerStatus.Status = config.ServerStoppedStatus

}

return &syncerStatus

case config.BinlogServerType:

var status config.BinlogServerStatus

if s.IsBinlogServerStarted() {

cfg := s.master.cfg

status.Addr = cfg.Addr

status.User = cfg.User

status.Password = cfg.Password

status.Slaves = make([]*mysql.Slave, 0, 2)

slaves := s.master.GetSlaves()

for _, s := range slaves {

status.Slaves = append(status.Slaves, s)

}

status.CurrentGtid = s.CurrentGtidStr()

status.LastBinlogFile = s.LastBinlogFile()

status.LastFilePosition = s.LastFilePosition()

status.ExecutedGtidSet = s.ExecutedGtidSetStr()

purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)

if err != nil {

log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)

}

status.PurgedGtidSet = purgedGtids.String()

status.Status = config.ServerRunningStatus

} else {

status.Status = config.ServerStoppedStatus

}

return &status

default:

log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)

}

return nil

}

  • GetServerStatus方法根据svrType参数来获取不同的status,若是config.SyncerServerType则获取的是SyncerStatus,若是config.BinlogServerType则获取的是BinlogServerStatus

小结

kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法

doc

  • binlog_server_handler

以上是 聊聊kingbus的binlog_server_handler.go 的全部内容, 来源链接: utcz.com/z/517593.html

回到顶部