并发性:限制goroutines不能按预期工作
我正在处理搜索引擎项目。为了更快的抓取速度,我使用每个链接访问一个goroutine。但是我遇到了两个问题,让我感到惊讶!并发性:限制goroutines不能按预期工作
第一个是一个代码示例:
package main import "fmt"
import "sync"
import "time"
type test struct {
running int
max int
mu sync.Mutex
}
func main() {
t := &test{max: 1000}
t.start()
}
func (t *test) start() {
for {
if t.running >= t.max {
time.Sleep(200 * time.Millisecond)
continue
}
go t.visit()
}
}
func (t *test) visit() {
t.inc()
defer t.dec()
fmt.Println("visit called")
fmt.Printf("running: %d, max: %d\n", t.running, t.max)
fmt.Println()
time.Sleep(time.Second)
}
func (t *test) inc() {
t.mu.Lock()
t.running++
t.mu.Unlock()
}
func (t *test) dec() {
t.mu.Lock()
t.running--
t.mu.Unlock()
}
输出(裁剪):
running: 2485, max: 1000 running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
running: 2485, max: 1000
visit called
running: 2485, max: 1000
running: 2485, max: 1000
虽然我明确地检查循环中最大允许够程,为什么跑够程超过最大?
第二个是真正的项目代码的一部分:
UPDATE:这实际上是固定的,问题是在LinkProvider.Get()
实现,时间太长,返回。 parser.visit()
同时返回,但Parser.Start()
中的循环正在等待新链接...并且输出看起来是连续的!
package worker import (
"errors"
"fmt"
"sync"
"time"
"bitbucket.org/codictive/ise/components/crawler/models"
"bitbucket.org/codictive/ise/components/log/logger"
"bitbucket.org/codictive/ise/core/component"
"bitbucket.org/codictive/ise/core/database"
)
// Worker is a service that processes crawlable links.
type Worker interface {
Start() error
Stop() error
Restart() error
Status() Status
}
// Status contains runtime status of a worker.
type Status struct {
Running bool
RunningParsersCount int
}
// New return a new defaultWorker with given config.
func New() Worker {
return &defaultWorker{
flow: make(chan bool),
stop: make(chan bool),
}
}
// defaultWorker is a Worker implementation.
type defaultWorker struct {
linkProvider LinkProvider
handlersLimit int
runningHandlersCount int
running bool
mu sync.Mutex
flow chan bool
stop chan bool
}
func (w *defaultWorker) init() {
prate, _ := component.IntConfig("crawler.crawlInterval")
arate, _ := component.IntConfig("crawler.ad_crawlInterval")
concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit")
w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour)
w.handlersLimit = concLimit
}
// Start runs worker.
func (w *defaultWorker) Start() error {
logger.Info("Starting crawler worker...")
w.running = true
w.init()
defer func() {
w.running = false
logger.Info("Worker stopped.")
}()
for {
select {
case <-w.stop:
w.flow <- true
return nil
default:
fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit)
if w.runningHandlersCount >= w.handlersLimit {
time.Sleep(200 * time.Millisecond)
continue
}
link := w.linkProvider.Get()
if link.ID == 0 {
logger.Debug("no link to crawl")
time.Sleep(time.Minute)
continue
}
go func(l *models.CrawlLink) {
go w.visit(l)
}(link)
}
}
}
// Stop stops worker.
func (w *defaultWorker) Stop() error {
logger.Info("Stopping crawler worker...")
w.stop <- true
select {
case <-w.flow:
return nil
case <-time.After(2 * time.Minute):
return errors.New("worker did not stopped properly")
}
}
// Restart re-starts worker.
func (w *defaultWorker) Restart() error {
logger.Info("Re-starting crawler worker...")
w.stop <- true
select {
case <-w.flow:
return w.Start()
case <-time.After(2 * time.Minute):
return errors.New("can not restart worker")
}
}
// Status reports current worker status.
func (w *defaultWorker) Status() Status {
return Status{
Running: w.running,
RunningParsersCount: w.runningHandlersCount,
}
}
func (w *defaultWorker) visit(cl *models.CrawlLink) {
w.incrementRunningWorkers()
defer w.decrementRunningWorkers()
if cl == nil {
logger.Warning("[crawler.worker.visit] Can not visit a nil link.")
return
}
if err := cl.LoadFull(); err != nil {
logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err)
return
}
parser := NewParser(cl)
if parser == nil {
logger.Error("[crawler.worker.visit] Parser instantiation failed.")
return
}
before := time.Now()
if err := parser.Parse(); err != nil {
cl.Error = err.Error()
logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err)
db := database.Open()
if err := db.Save(&cl).Error; err != nil {
logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err)
}
}
logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before))
fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before))
}
func (w *defaultWorker) incrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount++
w.mu.Unlock()
fmt.Printf("increment called. current: %d\n", w.runningHandlersCount)
}
func (w *defaultWorker) decrementRunningWorkers() {
w.mu.Lock()
w.runningHandlersCount--
w.mu.Unlock()
fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount)
}
输出:
2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof running: 0 limit: 1000
Running server on :8080
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms.
decrement called. current: 0
running: 0 limit: 1000
increment called. current: 1
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms.
decrement called. current: 0
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof
正如你可以看到visit
方法运行完全顺序!无论我是只用go visit(link)
还是上面的代码中使用的名称。 为什么会发生这种情况?什么是阻止循环迭代?
回答:
我会使用渠道和拦截功能解决了这个问题 - https://play.golang.org/p/KbYOI1oGNs
主要的变化是,我们有一个通道guard
,我们把有新项目时够程启动(如果大小达到限制它会阻止) ,完成后释放。
func (t *test) start() { maxGoroutines := t.max
guard := make(chan struct{}, maxGoroutines)
for {
guard <- struct{}{}
go func() {
t.visit()
<-guard
}()
}
}
以上是 并发性:限制goroutines不能按预期工作 的全部内容, 来源链接: utcz.com/qa/260010.html