聊聊kingbus的membership_handler.go

编程

本文主要研究一下kingbus的membership_handler.go

GetMembers

kingbus/api/membership_handler.go

//GetMembers implements get information of membership, not include lead information

func (h *MembershipHandler) GetMembers(echoCtx echo.Context) error {

members := h.cluster.Members()

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))

}

  • GetMembers方法主要是通过h.cluster.Members()获取members,然后json化

AddMember

kingbus/api/membership_handler.go

//AddMember implements add a member into raft cluster

func (h *MembershipHandler) AddMember(echoCtx echo.Context) error {

args := struct {

NodeName string `json:"name"`

PeerURL string `json:"peer_url"`

AdminURL string `json:"admin_url"`

}{}

err := echoCtx.Bind(&args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

isLeader := h.svr.IsLeader()

if isLeader == false {

req, err := json.Marshal(args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

resp, err := h.sendToLeader("POST", req)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

ctx, cancel := context.WithTimeout(context.Background(), h.timeout)

defer cancel()

peerURLs, err := types.NewURLs([]string{args.PeerURL})

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

adminURLs, err := types.NewURLs([]string{args.AdminURL})

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

now := time.Now()

member := membership.NewMember(args.NodeName, peerURLs, adminURLs, &(now))

members, err := h.svr.AddMember(ctx, *member)

switch {

case err == membership.ErrIDExists || err == membership.ErrPeerURLexists:

return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))

case err != nil:

log.Log.Errorf("error adding member %s (%v)", member.ID, err)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))

}

  • AddMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("POST", req)转发给leader,为true的话则通过membership.NewMember创建member,然后通过h.svr.AddMember来添加member

UpdateMember

kingbus/api/membership_handler.go

//UpdateMember implements update member information

func (h *MembershipHandler) UpdateMember(echoCtx echo.Context) error {

args := struct {

NodeName string `json:"name"`

NewPeerURL string `json:"new_peer_url"`

}{}

err := echoCtx.Bind(&args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

isLeader := h.svr.IsLeader()

if isLeader == false {

req, err := json.Marshal(args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

resp, err := h.sendToLeader("PUT", req)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

ctx, cancel := context.WithTimeout(context.Background(), h.timeout)

defer cancel()

m := h.cluster.MemberByName(args.NodeName)

if m == nil {

err = fmt.Errorf("no such member: %s", args.NodeName)

return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))

}

newMember := membership.Member{

ID: m.ID,

RaftAttributes: membership.RaftAttributes{PeerURLs: []string{args.NewPeerURL}},

}

members, err := h.svr.UpdateMember(ctx, newMember)

switch {

case err == membership.ErrPeerURLexists:

return echoCtx.JSON(http.StatusConflict, utils.NewResp().SetError(err.Error()))

case err == membership.ErrIDNotFound:

err = fmt.Errorf("no such member: %s", args.NodeName)

return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(err.Error()))

case err != nil:

log.Log.Errorf("error updating member %s (%v)", m.ID, err)

echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

default:

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))

}

return nil

}

  • UpdateMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("PUT", req)转发给leader,为true的话则通过h.svr.UpdateMember来更新member

DeleteMember

kingbus/api/membership_handler.go

//DeleteMember implements remove a member from raft cluster

func (h *MembershipHandler) DeleteMember(echoCtx echo.Context) error {

args := struct {

NodeName string `json:"name"`

PeerURL string `json:"peer_url"`

}{}

err := echoCtx.Bind(&args)

if err != nil {

return echoCtx.JSON(http.StatusForbidden, err.Error())

}

isLeader := h.svr.IsLeader()

if isLeader == false {

req, err := json.Marshal(args)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

resp, err := h.sendToLeader("DELETE", req)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(resp.Data))

}

ctx, cancel := context.WithTimeout(context.Background(), h.timeout)

defer cancel()

m := h.cluster.MemberByName(args.NodeName)

if m == nil {

msg := fmt.Sprintf("No such member: %s", args.NodeName)

return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))

}

log.Log.Debugf("DeleteMember:remove member id is %s", m.ID.String())

members, err := h.svr.RemoveMember(ctx, uint64(m.ID))

switch {

case err == membership.ErrIDRemoved:

msg := fmt.Sprintf("Member permanently removed: %s", args.NodeName)

return echoCtx.JSON(http.StatusGone, utils.NewResp().SetError(msg))

case err == membership.ErrIDNotFound:

msg := fmt.Sprintf("No such member: %s", args.NodeName)

return echoCtx.JSON(http.StatusNotFound, utils.NewResp().SetError(msg))

case err != nil:

log.Log.Errorf("error removing member %s (%v)", args.NodeName, err)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(members))

}

  • DeleteMember方法先通过h.svr.IsLeader()判断isLeader,为false的话通过h.sendToLeader("DELETE", req)转发给leader,为true的话则通过h.svr.RemoveMember来删除member

GetCluster

kingbus/api/membership_handler.go

//GetCluster implements get information of raft cluster

func (h *MembershipHandler) GetCluster(echoCtx echo.Context) error {

members := h.cluster.Members()

roles := make([]*Role, 0, len(members))

for _, m := range members {

r := new(Role)

r.ID = fmt.Sprintf("%x", uint64(m.ID))

r.RaftAttributes = m.RaftAttributes

r.Attributes = m.Attributes

if uint64(m.ID) == uint64(h.svr.Leader()) {

r.IsLeader = true

} else {

r.IsLeader = false

}

roles = append(roles, r)

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(roles))

}

  • GetCluster方法通过h.cluster.Members()获取members,然后遍历members构造roles信息,最后json化返回

UpdateAdminURL

kingbus/api/membership_handler.go

//UpdateAdminURL implements update raft node admin url in raft cluster

func (h *MembershipHandler) UpdateAdminURL(echoCtx echo.Context) error {

var attributes config.Attributes

if h.svr.IsLeader() == false {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(ErrNotLeader.Error()))

}

err := echoCtx.Bind(&attributes)

if err != nil {

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

data, err := attributes.EncodeWithType()

if err != nil {

log.Log.Errorf("attributes EncodeWithType error,err:%v,attributes:%v", err, attributes)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

err = h.svr.Propose(data)

if err != nil {

log.Log.Errorf("Propose attributes error,err:%s,attributes:%v", err, attributes)

return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))

}

return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))

}

  • UpdateAdminURL方法在h.svr.IsLeader()为false时直接返回error;之后主要执行h.svr.Propose(data)

小结

membership_handler.go提供了GetMembers、AddMember、UpdateMember、DeleteMember、GetCluster、UpdateAdminURL方法

doc

  • membership_handler

以上是 聊聊kingbus的membership_handler.go 的全部内容, 来源链接: utcz.com/z/517525.html

回到顶部