聊聊dubbogo的forkingCluster

编程

本文主要研究一下dubbo-go的forkingCluster

forkingCluster

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster.go

type forkingCluster struct{}

const forking = "forking"

func init() {

extension.SetCluster(forking, NewForkingCluster)

}

// NewForkingCluster ...

func NewForkingCluster() cluster.Cluster {

return &forkingCluster{}

}

func (cluster *forkingCluster) Join(directory cluster.Directory) protocol.Invoker {

return newForkingClusterInvoker(directory)

}

  • forkingCluster的Join方法执行newForkingClusterInvoker

newForkingClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

type forkingClusterInvoker struct {

baseClusterInvoker

}

func newForkingClusterInvoker(directory cluster.Directory) protocol.Invoker {

return &forkingClusterInvoker{

baseClusterInvoker: newBaseClusterInvoker(directory),

}

}

  • newForkingClusterInvoker创建了forkingClusterInvoker

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/forking_cluster_invoker.go

// Invoke ...

func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {

err := invoker.checkWhetherDestroyed()

if err != nil {

return &protocol.RPCResult{Err: err}

}

invokers := invoker.directory.List(invocation)

err = invoker.checkInvokers(invokers, invocation)

if err != nil {

return &protocol.RPCResult{Err: err}

}

var selected []protocol.Invoker

forks := int(invoker.GetUrl().GetParamInt(constant.FORKS_KEY, constant.DEFAULT_FORKS))

timeouts := invoker.GetUrl().GetParamInt(constant.TIMEOUT_KEY, constant.DEFAULT_TIMEOUT)

if forks < 0 || forks > len(invokers) {

selected = invokers

} else {

selected = make([]protocol.Invoker, 0)

loadbalance := getLoadBalance(invokers[0], invocation)

for i := 0; i < forks; i++ {

ivk := invoker.doSelect(loadbalance, invocation, invokers, selected)

if ivk != nil {

selected = append(selected, ivk)

}

}

}

resultQ := queue.New(1)

for _, ivk := range selected {

go func(k protocol.Invoker) {

result := k.Invoke(ctx, invocation)

err := resultQ.Put(result)

if err != nil {

logger.Errorf("resultQ put failed with exception: %v.

", err)

}

}(ivk)

}

rsps, err := resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))

if err != nil {

return &protocol.RPCResult{

Err: fmt.Errorf("failed to forking invoke provider %v, "+

"but no luck to perform the invocation. Last error is: %v", selected, err),

}

}

if len(rsps) == 0 {

return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but no resp", selected)}

}

result, ok := rsps[0].(protocol.Result)

if !ok {

return &protocol.RPCResult{Err: fmt.Errorf("failed to forking invoke provider %v, but not legal resp", selected)}

}

return result

}

  • Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后从invoker.GetUrl()获取forks及timeouts参数,然后循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke方法,并将结果放到resultQ中;最后通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的结果返回

小结

forkingCluster的Join方法执行newForkingClusterInvoker;其Invoke方法循环forks次通过invoker.doSelect(loadbalance, invocation, invokers, selected)选出selected的invokers;之后遍历selected异步执行其Invoke方法,并将结果放到resultQ中;最后通过resultQ.Poll(1, time.Millisecond*time.Duration(timeouts))拉取最先返回的结果返回

doc

  • forking_cluster

以上是 聊聊dubbogo的forkingCluster 的全部内容, 来源链接: utcz.com/z/519188.html

回到顶部