并发性:限制goroutines不能按预期工作

我正在处理搜索引擎项目。为了更快的抓取速度,我使用每个链接访问一个goroutine。但是我遇到了两个问题,让我感到惊讶!并发性:限制goroutines不能按预期工作

第一个是一个代码示例:

package main 

import "fmt"

import "sync"

import "time"

type test struct {

running int

max int

mu sync.Mutex

}

func main() {

t := &test{max: 1000}

t.start()

}

func (t *test) start() {

for {

if t.running >= t.max {

time.Sleep(200 * time.Millisecond)

continue

}

go t.visit()

}

}

func (t *test) visit() {

t.inc()

defer t.dec()

fmt.Println("visit called")

fmt.Printf("running: %d, max: %d\n", t.running, t.max)

fmt.Println()

time.Sleep(time.Second)

}

func (t *test) inc() {

t.mu.Lock()

t.running++

t.mu.Unlock()

}

func (t *test) dec() {

t.mu.Lock()

t.running--

t.mu.Unlock()

}

输出(裁剪):

running: 2485, max: 1000 

running: 2485, max: 1000

running: 2485, max: 1000

visit called

running: 2485, max: 1000

running: 2485, max: 1000

running: 2485, max: 1000

running: 2485, max: 1000

visit called

running: 2485, max: 1000

running: 2485, max: 1000

虽然我明确地检查循环中最大允许够程,为什么跑够程超过最大?


第二个是真正的项目代码的一部分:

UPDATE:这实际上是固定的,问题是在LinkProvider.Get()实现,时间太长,返回。 parser.visit()同时返回,但Parser.Start()中的循环正在等待新链接...并且输出看起来是连续的!

package worker 

import (

"errors"

"fmt"

"sync"

"time"

"bitbucket.org/codictive/ise/components/crawler/models"

"bitbucket.org/codictive/ise/components/log/logger"

"bitbucket.org/codictive/ise/core/component"

"bitbucket.org/codictive/ise/core/database"

)

// Worker is a service that processes crawlable links.

type Worker interface {

Start() error

Stop() error

Restart() error

Status() Status

}

// Status contains runtime status of a worker.

type Status struct {

Running bool

RunningParsersCount int

}

// New return a new defaultWorker with given config.

func New() Worker {

return &defaultWorker{

flow: make(chan bool),

stop: make(chan bool),

}

}

// defaultWorker is a Worker implementation.

type defaultWorker struct {

linkProvider LinkProvider

handlersLimit int

runningHandlersCount int

running bool

mu sync.Mutex

flow chan bool

stop chan bool

}

func (w *defaultWorker) init() {

prate, _ := component.IntConfig("crawler.crawlInterval")

arate, _ := component.IntConfig("crawler.ad_crawlInterval")

concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit")

w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour)

w.handlersLimit = concLimit

}

// Start runs worker.

func (w *defaultWorker) Start() error {

logger.Info("Starting crawler worker...")

w.running = true

w.init()

defer func() {

w.running = false

logger.Info("Worker stopped.")

}()

for {

select {

case <-w.stop:

w.flow <- true

return nil

default:

fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit)

if w.runningHandlersCount >= w.handlersLimit {

time.Sleep(200 * time.Millisecond)

continue

}

link := w.linkProvider.Get()

if link.ID == 0 {

logger.Debug("no link to crawl")

time.Sleep(time.Minute)

continue

}

go func(l *models.CrawlLink) {

go w.visit(l)

}(link)

}

}

}

// Stop stops worker.

func (w *defaultWorker) Stop() error {

logger.Info("Stopping crawler worker...")

w.stop <- true

select {

case <-w.flow:

return nil

case <-time.After(2 * time.Minute):

return errors.New("worker did not stopped properly")

}

}

// Restart re-starts worker.

func (w *defaultWorker) Restart() error {

logger.Info("Re-starting crawler worker...")

w.stop <- true

select {

case <-w.flow:

return w.Start()

case <-time.After(2 * time.Minute):

return errors.New("can not restart worker")

}

}

// Status reports current worker status.

func (w *defaultWorker) Status() Status {

return Status{

Running: w.running,

RunningParsersCount: w.runningHandlersCount,

}

}

func (w *defaultWorker) visit(cl *models.CrawlLink) {

w.incrementRunningWorkers()

defer w.decrementRunningWorkers()

if cl == nil {

logger.Warning("[crawler.worker.visit] Can not visit a nil link.")

return

}

if err := cl.LoadFull(); err != nil {

logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err)

return

}

parser := NewParser(cl)

if parser == nil {

logger.Error("[crawler.worker.visit] Parser instantiation failed.")

return

}

before := time.Now()

if err := parser.Parse(); err != nil {

cl.Error = err.Error()

logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err)

db := database.Open()

if err := db.Save(&cl).Error; err != nil {

logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err)

}

}

logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before))

fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before))

}

func (w *defaultWorker) incrementRunningWorkers() {

w.mu.Lock()

w.runningHandlersCount++

w.mu.Unlock()

fmt.Printf("increment called. current: %d\n", w.runningHandlersCount)

}

func (w *defaultWorker) decrementRunningWorkers() {

w.mu.Lock()

w.runningHandlersCount--

w.mu.Unlock()

fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount)

}

输出:

2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof 

running: 0 limit: 1000

Running server on :8080

running: 0 limit: 1000

increment called. current: 1

[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms.

decrement called. current: 0

running: 0 limit: 1000

increment called. current: 1

[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms.

decrement called. current: 0

running: 0 limit: 1000

increment called. current: 1

[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms.

decrement called. current: 0

running: 0 limit: 1000

increment called. current: 1

[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms.

decrement called. current: 0

^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles

2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof

正如你可以看到visit方法运行完全顺序!无论我是只用go visit(link)还是上面的代码中使用的名称。 为什么会发生这种情况?什么是阻止循环迭代?

回答:

我会使用渠道和拦截功能解决了这个问题 - https://play.golang.org/p/KbYOI1oGNs

主要的变化是,我们有一个通道guard,我们把有新项目时够程启动(如果大小达到限制它会阻止) ,完成后释放。

func (t *test) start() { 

maxGoroutines := t.max

guard := make(chan struct{}, maxGoroutines)

for {

guard <- struct{}{}

go func() {

t.visit()

<-guard

}()

}

}

以上是 并发性:限制goroutines不能按预期工作 的全部内容, 来源链接: utcz.com/qa/260010.html

回到顶部