多个生产者,一个消费者:所有goroutine都在睡眠中-死锁

在进行工作之前,我一直遵循一种检查通道中是否有东西的模式:

func consume(msg <-chan message) {

for {

if m, ok := <-msg; ok {

fmt.Println("More messages:", m)

} else {

break

}

}

}

这是基于这部视频的。这是我的完整代码:

package main

import (

"fmt"

"strconv"

"strings"

"sync"

)

type message struct {

body string

code int

}

var markets []string = []string{"BTC", "ETH", "LTC"}

// produces messages into the chan

func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {

// for i := 0; i < n; i++ {

var msgToSend = message{

body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),

code: 1,

}

fmt.Println("Producing:", msgToSend)

msg <- msgToSend

// }

wg.Done()

}

func receive(msg <-chan message, wg *sync.WaitGroup) {

for {

if m, ok := <-msg; ok {

fmt.Println("Received:", m)

} else {

fmt.Println("Breaking from receiving")

break

}

}

wg.Done()

}

func main() {

wg := sync.WaitGroup{}

msgC := make(chan message, 100)

defer func() {

close(msgC)

}()

for ix, market := range markets {

wg.Add(1)

go produce(ix+1, market, msgC, &wg)

}

wg.Add(1)

go receive(msgC, &wg)

wg.Wait()

}

如果您尝试运行它,则在打印将要中断的消息之前,我们最终将陷入僵局。自上次以来,当chan中没有其他内容时,tbh才有意义,因此我们试图拉出该值,因此出现此错误。但是这样的模式是不可行的if

m, ok := <- msg; ok。我如何使此代码起作用以及为什么会出现此死锁错误(大概此模式应该起作用?)。

回答:

鉴于您在一个频道上确实有多个作者,因此您会遇到一些挑战,因为在Go中执行此操作的简单方法通常是在一个频道上拥有一个作者,然后让该作者关闭频道。发送最后一个数据时的通道:

func produce(... args including channel) {

defer close(ch)

for stuff_to_produce {

ch <- item

}

}

这种模式具有很好的特性,无论您如何退出produce,通道都会关闭,从而指示生产已结束。

您没有使用这种模式,而是向多个goroutine传递了一个通道,每个goroutine都可以发送 一条

消息,因此您需要移动close(当然,也可以使用其他模式)。表达所需模式的最简单方法是:

func overall_produce(... args including channel ...) {

var pg sync.WaitGroup

defer close(ch)

for stuff_to_produce {

pg.Add(1)

go produceInParallel(ch, &pg) // add more args if appropriate

}

pg.Wait()

}

pg计数器累计活跃的生产者。每个调用都必须使用pg.Done()来表明已完成ch。总制片人现在等待他们全部完成,那么

关闭的道路上走出通道。

(如果将内部produceInParallel函数编写为闭包,则无需显式传递ch并传递pg给它。也可以将其编写overallProducer为闭包。)

请注意,单个使用者的循环可能最好使用以下for ... range结构来表示:

func receive(msg <-chan message, wg *sync.WaitGroup) {

for m := range msg {

fmt.Println("Received:", m)

}

wg.Done()

}

(您提到了select向循环添加a的意图,以便在消息尚未准备好时可以执行其他一些计算。如果无法将该代码分解为独立的goroutine,则实际上您将需要更高级的m,

ok := <-msg构造。)

还要注意,wgfor

receive(取决于您构造其他事物的方式可能是不必要pg的)与生产者的等待组非常独立。诚然,按照书面说明,只有在所有生产者都完成之后才能完成消费者的工作,但我们希望独立等待生产者完成,以便我们可以关闭整体生产者包装中的渠道。

以上是 多个生产者,一个消费者:所有goroutine都在睡眠中-死锁 的全部内容, 来源链接: utcz.com/qa/402896.html

回到顶部