如何使用gracefulStop关闭所有grpc服务器流?

我正在尝试停止从服务器端连接到流服务器的所有客户端。其实我正在使用GracefulStop方法来优雅地处理它。

我正在等待os.Interrupt通道上的信号以对gRPC执行正常停止。但是server.GracefulStop()当客户端连接时会卡住。

func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {

ctx := srv.Context()

updateCh := make(chan *clientapi.Update, 100)

stopCh := make(chan bool)

defer func() {

stopCh<-true

close(updateCh)

}

go func() {

ticker := time.NewTicker(1 * time.Second)

defer func() {

ticker.Stop()

close(stopCh)

}

for {

select {

case <-stopCh:

return

case <-ticker.C:

updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}

}

}

}()

for {

select {

case <-ctx.Done():

return ctx.Err()

case notif := <-updateCh:

err := srv.Send(notif)

if err == io.EOF {

return nil

}

if err != nil {

s.logger.Named("Subscribe").Error("error", zap.Error(err))

continue

}

}

}

}

我希望contextin方法ctx.Done()可以处理它并中断for循环。如何关闭像这样的所有响应流?

回答:

为您的gRPC服务创建一个 全局context变量。因此,遍历各个部分:

  • 每个gRPC服务请求都将使用此上下文(以及客户端上下文)来满足该请求
  • os.Interrupt处理程序将取消全局上下文;从而取消任何当前正在运行的请求
  • 最终问题server.GracefulStop()-应该等待所有活动的gRPC调用完成(如果他们没有立即看到取消操作)

因此,例如,在设置gRPC服务时:

pctx := context.Background()

globalCtx, globalCancel := context.WithCancel(pctx)

mysrv := MyService{

gctx: globalCtx

}

s := grpc.NewServer()

pb.RegisterMyService(s, mysrv)

os.Interrupt 处理程序启动并等待关闭:

globalCancel()

server.GracefulStop()

gRPC方法:

func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {

// merge client and server contexts into one `mctx`

// (client context will cancel if client disconnects)

// (server context will cancel if service Ctrl-C'ed)

mctx, mcancel := mergeContext(ctx, s.gctx)

defer mcancel() // so we don't leak, if neither client or server context cancels

// RPC WORK GOES HERE

// RPC WORK GOES HERE

// RPC WORK GOES HERE

// pass mctx to any blocking calls:

// - http REST calls

// - SQL queries etc.

// - or if running a long loop; status check the context occasionally like so:

// Example long request (10s)

for i:=0; i<10*1000; i++ {

time.Sleep(1*time.Milliscond)

// poll merged context

select {

case <-mctx.Done():

return fmt.Errorf("request canceled: %s", mctx.Err())

default:

}

}

}

和:

func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {

mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels

go func() {

select {

case <-mctx.Done(): // don't leak go-routine on clean gRPC run

case <-b.Done():

mcancel() // b canceled, so cancel mctx

}

}()

return mctx, mcancel

}

以上是 如何使用gracefulStop关闭所有grpc服务器流? 的全部内容, 来源链接: utcz.com/qa/426640.html

回到顶部