Go Worker pool 工作池实现详解
Channel缓冲的重要用途之一是实现工作池。
通常,工作池是等待分配给它们的任务的线程的集合。 一旦他们完成分配的任务,他们就会再次为下一个任务提供服务。
我们将使用缓冲通道实现一个工作池。 我们的工作池将执行计算输入数字的数字总和的任务。 例如,如果传递了 234,则输出将为 9 (2 + 3 + 4)。 工作池的输入将是一个伪随机整数列表。
下面是我们的工作池的核心功能
创建一个 Goroutines 池,它侦听输入缓冲通道,等待分配作业
将作业添加到输入缓冲通道
作业完成后将结果写入输出缓冲通道
从输出缓冲通道读取和打印结果
我们将逐步实现这个程序,以使其更易于理解。
第一步将是创建表示作业和结果的结构体
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
每个 Job
结构体都有一个 id 和一个 randomno ,必须为其计算各个数字的总和。
Result
结构体有一个 job 字段,该字段是在 sumofdigits 字段中保存结果(单个数字的总和)的作业。
下一步是创建用于接收作业和写入输出的缓冲通道。
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
Worker Goroutines 在 jobs 缓冲通道上监听新任务。 任务完成后,将结果写入 results 缓冲通道。
下面我们定义一个 digits
函数,该函数的主要功能是计算各个数字的总和并返回它。 我们将给这个函数添加一个 2 秒的睡眠,只是为了模拟这个函数需要一些时间来计算结果的事实。
funcdigits(number int)int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
接下来我们定义一个 worker
函数。
funcworker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
上述函数创建一个从 jobs 通道读取数据的工作协程,使用当前作业和 digits
函数的返回值创建一个 Result 结构体,然后将结果写入 results 缓冲通道。 此函数将 WaitGroup wg
作为参数,当所有作业完成后,它将调用 Done()
方法。
createWorkerPool
函数将创建一个 worker Goroutines 池。
funccreateWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
上面的函数将要创建的 worker 数量作为参数。 它在创建 Goroutine 之前调用 wg.Add(1)
来增加 WaitGroup 计数器。 然后它通过将 WaitGroup wg 的指针传递给 worker
函数来创建 worker Goroutines。 在创建所需的 Goroutines 后,它通过调用 wg.Wait()
等待所有 Goroutines 完成它们的执行。 在所有 Goroutines 执行完毕后,它关闭 results 通道,因为所有 Goroutines 都已完成它们的执行,并且没有其他的协程会进一步写入 results 通道。
现在我们已经准备好工作池,让我们继续定义一个用来进行分配的函数。
funcallocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
上面的 allocate
函数以要创建的任务的数量作为输入参数,生成最大值为998的伪随机数,以随机数和for循环计数器i为id创建Job结构体,然后将它们写入 jobs 通道。 它在写入所有作业后关闭 jobs 通道。
下一步是创建读取 results 通道并打印输出的函数。
funcresult(done chanbool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
result
函数读取 results 通道并打印 Job ID,输入的随机数,以及随机数的数字总和。 result 函数还将 接收done
通道作为参数,一旦它打印了所有结果,它就会写入该通道。
我们现在已经准备好了一切。 让我们继续完成最后一步,编写 main()
函数调用所有这些函数。
funcmain() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chanbool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
我们首先保存程序的执行开始时间,在最后我们计算 endTime 和 startTime 之间的时间差并显示程序花费的总时间。 这是必要的,因为我们将通过改变 Goroutines 的数量来做一些基准测试。
noOfJobs
设置为 100,然后调用 allocate
将作业添加到 jobs 通道。
然后创建 done
通道并将其传递给 result Goroutine,以便它可以开始打印输出并在打印完所有内容后通知主协程。
最后,通过调用 createWorkerPool
函数创建了一个包含 10 个 job Goroutines 的池,然后 main 在 done 通道上等待所有要打印的结果。
下面是完整的程序。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
funcdigits(number int)int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
funcworker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
funccreateWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
funcallocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
funcresult(done chanbool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
funcmain() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chanbool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
请在本地机器上运行此程序,以获得更准确的总时间计算。
上面程序执行结果如下
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.025220391 seconds
上面程序总共将打印 100 行,对应 100 个作业,最后将在最后一行打印程序运行所需的总时间。 没个人的输出将与这里的结果不同,因为 Goroutine 可以以任何顺序运行,并且总时间也会因硬件而异。 就我而言,程序完成大约需要 20 秒。
现在让我们将主函数中的 noOfWorkers 增加到 20。我们已经将 worker 的数量增加了一倍。 由于 worker Goroutines 增加了(准确地说是翻了一番),程序完成所需的总时间应该减少(准确地说是减少一半)。
在我的机器上运行时间是 10.00877495。
现在我们可以理解,随着 worker Goroutine 数量的增加,完成作业所需的总时间减少。我们可以将 main 函数中的 noOfJobs 和 noOfWorkers 设置为不同的值并分析运行结果。
本文转载自:迹忆客(https://www.jiyik.com)
以上是 Go Worker pool 工作池实现详解 的全部内容, 来源链接: utcz.com/z/290244.html