聊聊kingbus的starRaft

编程

本文主要研究一下kingbus的starRaft

starRaft

kingbus/server/server.go

func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {

var (

etcdRaftNode etcdraft.Node

id types.ID

cl *membership.RaftCluster

remotes []*membership.Member

appliedIndex uint64

)

prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)

if err != nil {

return err

}

store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)

if err != nil {

log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)

}

//store, err := storage.NewMemoryStorage(cfg.DataDir)

//if err != nil {

// log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)

//}

defer func() {

//close storage when occur error

if err != nil {

store.Close()

}

}()

logExist := utils.ExistLog(cfg.DataDir)

switch {

case !logExist && !cfg.NewCluster:

if err = cfg.VerifyJoinExisting(); err != nil {

return err

}

cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)

if err != nil {

return err

}

remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)

existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)

if gerr != nil {

return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)

}

if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {

return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)

}

remotes = existingCluster.Members()

cl.SetID(existingCluster.GetID())

cl.SetStore(store)

id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)

case !logExist && cfg.NewCluster:

if err = cfg.VerifyBootstrap(); err != nil {

return err

}

cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)

if err != nil {

return err

}

m := cl.MemberByName(cfg.Name)

if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {

return fmt.Errorf("member %s has already been bootstrapped", m.ID)

}

cl.SetStore(store)

id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())

case logExist:

if err = utils.IsDirWriteable(cfg.DataDir); err != nil {

return fmt.Errorf("cannot write to member directory: %v", err)

}

//node restart, read states from storage

//get applied index

appliedIndex = raft.MustGetAppliedIndex(store)

cfg.AppliedIndex = appliedIndex

id, etcdRaftNode, cl = restartEtcdNode(cfg, store)

cl.SetStore(store)

default:

return fmt.Errorf("unsupported bootstrap config")

}

s.raftNode = raft.NewNode(

raft.NodeConfig{

IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },

Node: etcdRaftNode,

Heartbeat: cfg.HeartbeatMs,

Storage: store,

},

)

//committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)

//set appliedIndex when applyEntries will check the entry continuity

s.raftNode.SetAppliedIndex(appliedIndex)

s.id = id

s.wait = wait.New()

s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())

s.stopping = make(chan struct{})

s.errorc = make(chan error)

s.applyBroadcast = utils.NewBroadcast()

s.stats = stats.NewServerStats(cfg.Name, id.String())

s.lstats = stats.NewLeaderStats(id.String())

s.store = store

tr := &rafthttp.Transport{

TLSInfo: transport.TLSInfo{},

DialTimeout: DialTimeout,

ID: id,

URLs: cfg.PeerURLs,

ClusterID: cl.GetID(),

Raft: s,

ServerStats: s.stats,

LeaderStats: s.lstats,

ErrorC: s.errorc,

}

if err = tr.Start(); err != nil {

return err

}

// add all remotes into transport

//Add remotes to rafthttp, who help newly joined members catch up the

//progress of the cluster. It supports basic message sending to remote, and

//has no stream connection for simplicity. remotes will not be used

//after the latest peers have been added into rafthttp.

for _, m := range remotes {

if m.ID != id {

tr.AddRemote(m.ID, m.PeerURLs)

}

}

for _, m := range cl.Members() {

if m.ID != id {

tr.AddPeer(m.ID, m.PeerURLs)

}

}

s.raftNode.Transport = tr

s.cluster = cl

return nil

}

  • starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

startEtcdRaftNode

kingbus/server/server.go

func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (

id types.ID, n etcdraft.Node) {

member := cl.MemberByName(cfg.Name)

peers := make([]etcdraft.Peer, len(ids))

for i, id := range ids {

ctx, err := json.Marshal((*cl).Member(id))

if err != nil {

log.Log.Panicf("marshal member should never fail: %v", err)

}

peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}

}

id = member.ID

log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())

c := &etcdraft.Config{

ID: uint64(id),

ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),

HeartbeatTick: 1,

Storage: store,

MaxSizePerMsg: cfg.MaxRequestBytes,

MaxInflightMsgs: maxInflightMsgs,

CheckQuorum: true,

PreVote: cfg.PreVote,

DisableProposalForwarding: true,

Logger: log.Log,

}

n = etcdraft.StartNode(c, peers)

raft.AdvanceTicks(n, c.ElectionTick)

return id, n

}

  • startEtcdRaftNode方法通过指定的ids创建peers,之后执行etcdraft.StartNode及raft.AdvanceTicks

restartEtcdNode

kingbus/server/server.go

func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (

types.ID, etcdraft.Node, *membership.RaftCluster) {

cl, err := membership.GetRaftClusterFromStorage(store)

if err != nil {

if err != nil {

log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())

}

}

log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())

//get id from raftCluster

member := cl.MemberByName(cfg.Name)

if member == nil {

log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",

cl.String(), cfg.Name)

}

c := &etcdraft.Config{

ID: uint64(member.ID),

ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),

HeartbeatTick: 1,

Applied: cfg.AppliedIndex, //set appliedIndex

Storage: store,

MaxSizePerMsg: cfg.MaxRequestBytes,

MaxInflightMsgs: maxInflightMsgs,

CheckQuorum: true,

PreVote: cfg.PreVote,

DisableProposalForwarding: true,

Logger: log.Log,

}

n := etcdraft.RestartNode(c)

return member.ID, n, cl

}

  • restartEtcdNode方法通过membership.GetRaftClusterFromStorage(store)获取RaftCluster,之后通过cl.MemberByName(cfg.Name)获取Member,然后使用member.ID构造etcdraft.Config,最后根据etcdraft.Config执行etcdraft.RestartNode

小结

starRaft方法先通过rafthttp.NewRoundTripper创建http.RoundTripper,之后通过storage.NewDiskStorage创建DiskStorage,之后根据logExist及cfg.NewCluster做不同处理;若二者都为false则更新membership.RaftCluster的id为存在的cluster的id,然后执行startEtcdRaftNode;若cfg.NewCluster为true则使用cl.MemberIDs()来执行startEtcdRaftNode;若logExist为true则执行restartEtcdNode;最后创建rafthttp.Transport,执行tr.Start()、tr.AddRemote、tr.AddPeer

doc

  • server.go

以上是 聊聊kingbus的starRaft 的全部内容, 来源链接: utcz.com/z/517417.html

回到顶部