聊聊kingbus的binlog_syncer_handler.go

编程

本文主要研究一下kingbus的binlog_syncer_handler.go

StartBinlogSyncer

kingbus/api/binlog_syncer_handler.go

//StartBinlogSyncer implements start a binlog syncer

func (h *BinlogSyncerHandler) StartBinlogSyncer(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

var args config.SyncerArgs

var err error

var syncerID int

defer func() {

if err != nil {

log.Log.Errorf("StartBinlogSyncer error,err: %s", err)

echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

}()

err = echoCtx.Bind(&args)

if err != nil {

return err

}

//check args

err = args.Check()

if err != nil {

return err

}

//forward to leader

if h.svr.IsLeader() == false {

req, err := json.Marshal(args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

resp, err := h.sendToLeader("PUT", "/binlog/syncer/start", req)

if err != nil {

log.Log.Errorf("sendToLeader error,err:%s,args:%v", err, args)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

if resp.Message != "success" {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

//start syncer server

err = h.svr.StartServer(config.SyncerServerType, &args)

if err != nil {

log.Log.Errorf("StartServer error,err:%s,args:%v", err, args)

return err

}

//propose start syncer info

err = h.ProposeSyncerArgs(&args)

if err != nil {

log.Log.Errorf("ProposeSyncerArgs error,err:%s,args:%v", err, args)

return err

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerID))

}

  • StartBinlogSyncer方法先执行echoCtx.Bind(&args),然后针对h.svr.IsLeader()为false的通过h.sendToLeader("PUT", "/binlog/syncer/start", req)将请求转发给leader;为true的则执行h.svr.StartServer(config.SyncerServerType, &args)启动syncer server,然后执行h.ProposeSyncerArgs(&args)启动propose syncer

StopBinlogSyncer

kingbus/api/binlog_syncer_handler.go

//StopBinlogSyncer implements stop binlog syncer

func (h *BinlogSyncerHandler) StopBinlogSyncer(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

//forward to leader

if h.svr.IsLeader() == false {

resp, err := h.sendToLeader("PUT", "/binlog/syncer/stop", nil)

if err != nil {

log.Log.Errorf("sendToLeader error,err:%s", err)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

if resp.Message != "success" {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

h.svr.StopServer(config.SyncerServerType)

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))

}

  • StopBinlogSyncer方法对于h.svr.IsLeader()为false的通过h.sendToLeader("PUT", "/binlog/syncer/stop", nil)将请求转发给leader;为true的则执行h.svr.StopServer(config.SyncerServerType)

GetBinlogSyncerStatus

kingbus/api/binlog_syncer_handler.go

//GetBinlogSyncerStatus implements get binlog syncer status in the runtime state

func (h *BinlogSyncerHandler) GetBinlogSyncerStatus(echoCtx echo.Context) error {

h.l.Lock()

defer h.l.Unlock()

//forward to leader

if h.svr.IsLeader() == false {

resp, err := h.sendToLeader("GET", "/binlog/syncer/status", nil)

if err != nil {

log.Log.Errorf("sendToLeader error,err:%s", err)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

if resp.Message != "success" {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(resp.Message))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

status := h.svr.GetServerStatus(config.SyncerServerType)

if syncerStatus, ok := status.(*config.SyncerStatus); ok {

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(syncerStatus))

}

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))

}

  • GetBinlogSyncerStatus方法对于h.svr.IsLeader()为false的通过h.sendToLeader("GET", "/binlog/syncer/status", nil)方法转发给leader;为true的则执行h.svr.GetServerStatus(config.SyncerServerType)获取status

sendToLeader

kingbus/api/membership_handler.go

func (h *MembershipHandler) sendToLeader(method string, req []byte) (*utils.Resp, error) {

leaderID := h.svr.Leader()

leader := h.cluster.Member(leaderID)

if leader == nil {

return nil, ErrNoLeader

}

if len(leader.AdminURLs) != 1 {

log.Log.Errorf("leader admin url is not 1,leader:%v", *leader)

return nil, ErrNoLeader

}

leaderURL, err := url.Parse(leader.AdminURLs[0])

if err != nil {

return nil, err

}

url := leaderURL.Scheme + "://" + leaderURL.Host + "/members"

resp, err := utils.SendRequest(method, url, req)

if err != nil {

log.Log.Errorf("sendToLeader:SendRequest error,err:%s,url:%s", err, url)

return nil, err

}

return resp, nil

}

  • sendToLeader方法通过h.svr.Leader()获取leaderId,然后执行h.cluster.Member(leaderID)获取leader,之后构造leaderURL,然后执行utils.SendRequest(method, url, req)发送请求

SendRequest

kingbus/utils/http_utils.go

//SendRequest send PUT request to leader

func SendRequest(method string, leaderURL string, data []byte) (*Resp, error) {

client := &http.Client{}

req, err := http.NewRequest(method, leaderURL, bytes.NewBuffer(data))

if err != nil {

return nil, err

}

req.Header.Set("Content-Type", "application/json;charset=utf-8")

resp, err := client.Do(req)

if err != nil {

return nil, err

}

defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)

if err != nil {

return nil, err

}

r := new(Resp)

err = json.Unmarshal(respBody, r)

if err != nil {

return nil, err

}

return r, nil

}

  • SendRequest通过http.NewRequest构建request,其contentType为application/json;charset=utf-8,之后通过client.Do(req)发送请求,然后通过ioutil.ReadAll(resp.Body)读取respBody,在通过json.Unmarshal(respBody, r)解析json为Resp类型

小结

kingbus的binlog_syncer_handler.go提供了StartBinlogSyncer、StopBinlogSyncer、GetBinlogSyncerStatus方法

doc

  • binlog_syncer_handler

以上是 聊聊kingbus的binlog_syncer_handler.go 的全部内容, 来源链接: utcz.com/z/517560.html

回到顶部