聊聊dubbogo的failbackCluster

编程

本文主要研究一下dubbo-go的failbackCluster

failbackCluster

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go

type failbackCluster struct{}

const failback = "failback"

func init() {

extension.SetCluster(failback, NewFailbackCluster)

}

// NewFailbackCluster ...

func NewFailbackCluster() cluster.Cluster {

return &failbackCluster{}

}

func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {

return newFailbackClusterInvoker(directory)

}

  • failbackCluster的join方法执行newFailbackClusterInvoker

newFailbackClusterInvoker

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

type failbackClusterInvoker struct {

baseClusterInvoker

once sync.Once

ticker *time.Ticker

maxRetries int64

failbackTasks int64

taskList *queue.Queue

}

func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {

invoker := &failbackClusterInvoker{

baseClusterInvoker: newBaseClusterInvoker(directory),

}

retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)

retries, err := strconv.Atoi(retriesConfig)

if err != nil || retries < 0 {

logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")

retries = constant.DEFAULT_FAILBACK_TIMES_INT

}

failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)

if failbackTasksConfig <= 0 {

failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS

}

invoker.maxRetries = int64(retries)

invoker.failbackTasks = failbackTasksConfig

return invoker

}

  • newFailbackClusterInvoker方法创建failbackClusterInvoker,并设置其maxRetries、failbackTasks属性

Invoke

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {

invokers := invoker.directory.List(invocation)

err := invoker.checkInvokers(invokers, invocation)

if err != nil {

logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.

",

invocation.MethodName(), invoker.GetUrl().Service(), err)

return &protocol.RPCResult{}

}

url := invokers[0].GetUrl()

methodName := invocation.MethodName()

//Get the service loadbalance config

lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)

//Get the service method loadbalance config if have

if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {

lb = v

}

loadbalance := extension.GetLoadbalance(lb)

invoked := make([]protocol.Invoker, 0, len(invokers))

var result protocol.Result

ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)

//DO INVOKE

result = ivk.Invoke(ctx, invocation)

if result.Error() != nil {

invoker.once.Do(func() {

invoker.taskList = queue.New(invoker.failbackTasks)

go invoker.process(ctx)

})

taskLen := invoker.taskList.Len()

if taskLen >= invoker.failbackTasks {

logger.Warnf("tasklist is too full > %d.

", taskLen)

return &protocol.RPCResult{}

}

timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)

invoker.taskList.Put(timerTask)

logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.

",

methodName, url.Service(), result.Error().Error())

// ignore

return &protocol.RPCResult{}

}

return result

}

  • Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,然后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)选择invoker,之后执行其Invoke方法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创建timerTask,添加到invoker.taskList

Destroy

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) Destroy() {

invoker.baseClusterInvoker.Destroy()

// stop ticker

if invoker.ticker != nil {

invoker.ticker.Stop()

}

_ = invoker.taskList.Dispose()

}

  • Destroy方法执行invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()

process

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) process(ctx context.Context) {

invoker.ticker = time.NewTicker(time.Second * 1)

for range invoker.ticker.C {

// check each timeout task and re-run

for {

value, err := invoker.taskList.Peek()

if err == queue.ErrDisposed {

return

}

if err == queue.ErrEmptyQueue {

break

}

retryTask := value.(*retryTimerTask)

if time.Since(retryTask.lastT).Seconds() < 5 {

break

}

// ignore return. the get must success.

_, err = invoker.taskList.Get(1)

if err != nil {

logger.Warnf("get task found err: %v

", err)

break

}

go func(retryTask *retryTimerTask) {

invoked := make([]protocol.Invoker, 0)

invoked = append(invoked, retryTask.lastInvoker)

retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)

var result protocol.Result

result = retryInvoker.Invoke(ctx, retryTask.invocation)

if result.Error() != nil {

retryTask.lastInvoker = retryInvoker

invoker.checkRetry(retryTask, result.Error())

}

}(retryTask)

}

}

}

  • process方法通过time.NewTicker(time.Second * 1)创建invoker.ticker,之后从invoker.taskList.Peek()获取retryTask(之后Get方法进行poll),然后异步执行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)选取retryInvoker,然后执行retryInvoker.Invoke(ctx, retryTask.invocation);如果执行出现异常,则通过invoker.checkRetry(retryTask, result.Error())进行check

checkRetry

dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go

func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {

logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.

",

retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())

retryTask.retries++

retryTask.lastT = time.Now()

if retryTask.retries > invoker.maxRetries {

logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.

",

retryTask.retries, retryTask.invocation)

} else {

invoker.taskList.Put(retryTask)

}

}

  • checkRetry方法会递增retryTask.retries,然后判断是否超过invoker.maxRetries,超过则记录error日志,不超过则再次将retryTask添加到invoker.taskList

小结

newFailbackClusterInvoker方法创建failbackClusterInvoker,并设置其maxRetries、failbackTasks属性;其Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,然后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)选择invoker,之后执行其Invoke方法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创建timerTask,添加到invoker.taskList

failbackCluster忽略result,针对失败的会加入队列重试maxRetries次,适合fireAndForget的通信模式

doc

  • failback_cluster

以上是 聊聊dubbogo的failbackCluster 的全部内容, 来源链接: utcz.com/z/519109.html

回到顶部