Golang协程并发的流水线模型

背景

最近由于性能问题,后端服务一直在做python到golang的迁移和重构。go语言精简优雅,既有编译型语言的严谨和高性能,又有解释型语言的开发效率,出色的并发性能也是go区别于其他语言的一大特色。go的并发编程代码虽然简单,但重在其并发模型和流程的设计。所以这里总结下golang协程并发常用的流水线模型。

简单的流水线思维

流水线模式并不是什么新奇的概念,但是它能极大地提高生产效率。比如实际生活中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比如第一个流程是拼装车身,第二个流程是安装发动机,第三个流程是装轮胎...,这些步骤我们可以类比成go并发流程中的协程,每一个协程就是一个任务。流水线上面传递的车身、发动机、轮胎,这些我们可以类比成协程间需要传递的数据,而在这些流程(协程)间传递这些配件(数据),自然就要通过传送带(channel)。在流水线上,我们装四个轮胎肯定不是一个一个来装的,肯定是有四个机械臂同时来装。因此装轮胎这个步骤我们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就构成了。
Golang的并发模型灵感其实都来自我们生活,对程序而言,高的生产效率就是高的性能。在Golang中,流水线由多个流程节点组成,流程之间通过channel连接,每个流程节点可以由多个同时运行的goroutine组成。
Golang协程并发的流水线模型

如何构造流水线

有了流水线模式的思维,接下来就是如何构造流水线了。简单来说,其实就是通过channel将任务流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程可以将任务分散到多个协程来执行。
我们先来看一个最简单的流水线,如下图,A是生产者流程,B是它的消费流程,同时又是C的生产者流程。A,B,C三个协程直接,通过读写channel进行通信。
Golang协程并发的流水线模型

那如果此时B流程可以将a channel中的任务并发执行呢,很简单,我们只需要起多个B协程就可以了。如下图。
Golang协程并发的流水线模型

总之,我们构造流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就构成了流水线模型。

关于channel

为什么我们可以选择channel来进行协程间的通信呢,协程之间又是怎么保持同步顺序呢,当然这都要归功于channel。channel是go提供的进程内协程间的通信方式,它是协程/线程安全的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合可以有以下几种情况:
Golang协程并发的流水线模型

**有1个特殊场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。

channel不仅可以保证协程安全的数据流动,还可以保证协程的同步。当有并发问题时,channel也是我们首先应该想到的数据结构。不过,当使用有缓冲区的channel时,才能达到协程并发的效果,并且生产者和消费者的协程间是相对同步的。使用无缓冲区的channel时,是没有并发效果的,协程间是绝对同步的,生产者和消费者必须同时写和读协程才能运行。
channel关注的是数据的流动,这种场景下都可以考虑使用channel。比如:消息传递、信号广播、任务分发、结果汇总、同步与异步、并发控制... 更多的不在这里赘述了,总之,Share memory by communicating, don't communicate by sharing memory.

流水线模型实例

举个简单栗子,计算80000以内的质数并输出。
这个例子如果我们采用非并发的方式,就是for循环80000,挨个判断是不是素数再输出。不过如果我们采用流水线的并发模型会更高效。
从数据流动的角度来分析,需要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输出结果到一个channel中。因此我们需要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取结果输出(消费者)

package gen_channel

import "fmt"

import "time"

func generate_source(data_source_chan chan int) {

for i := 1; i <= 80000; i++ {

data_source_chan <- i

}

fmt.Println("写入协程结束")

close(data_source_chan)

}

func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {

for num:= range data_source_chan {

falg := true

for i := 2; i < num; i++ {

if num%i == 0 {

falg = false

break }

}

if falg == true {

data_result_chan <- num

}

}

fmt.Println("该协程结束")

gen_chan <- true

}

func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){

// 开启8个协程

for i := 0; i < gen_num; i++ {

go generate_sushu(data_source_chan, data_result_chan, gen_chan)

}

}

func Channel_main() {

data_source_chan := make(chan int, 2000)

data_result_chan := make(chan int, 2000)

gen_chan := make(chan bool, 8)

time1 := time.Now().Unix()

go generate_source(data_source_chan)

// 协程池,任务分发

workpool(data_source_chan, data_result_chan, gen_chan, 8)

go func() {

for i := 0; i < 8; i++ {

<-gen_chan

}

close(data_result_chan)

fmt.Println("spend timeis ", time.Now().Unix()-time1)

}()

for date_result := range data_result_chan {

fmt.Println(date_result)

}

}

以上是 Golang协程并发的流水线模型 的全部内容, 来源链接: utcz.com/a/66670.html

回到顶部