如何有效地“最大化”并发HTTP请求?

我目前正在尝试使用Go进行一些实验。这是我正在尝试做的事情:

我有一个REST API服务正在运行,我想在尽可能多的Goroutine中反复查询特定的URL,以查看这些响应的性能如何(通过查看我的REST

API服务器日志)。在退出程序之前,我想发送总计100万个HTTP请求" title="HTTP请求">HTTP请求-在计算机允许的范围内同时执行。

我知道有一些工具可以做到这一点,但是我主要对如何使用goroutines在Go中最大化HTTP并发性感兴趣。

这是我的代码:

package main

import (

"fmt"

"net/http"

"runtime"

"time"

)

func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

transport := &http.Transport{}

for i := 0; i < 1000000; i++ {

go func() {

req, _ := http.NewRequest("GET", "http://myapi.com", nil)

req.Header.Set("User-Agent", "custom-agent")

req.SetBasicAuth("xxx", "xxx")

resp, err := transport.RoundTrip(req)

if err != nil {

panic("HTTP request failed.")

}

defer resp.Body.Close()

if resp.StatusCode != 302 {

panic("Unexpected response returned.")

}

location := resp.Header.Get("Location")

if location == "" {

panic("No location header returned.")

}

fmt.Println("Location Header Value:", location)

}()

}

time.Sleep(60 * time.Second)

}

我期望这段代码能做的是:

  • 启动1,000,000个goroutine,每个例程向我的API服务发出HTTP请求。
  • 在我所有的CPU上同时运行goroutines(因为我使用了运行时包来增加GOMAXPROCS设置)。

但是,发生的是我遇到以下错误(要粘贴的错误太多,因此只包含了一部分输出):

goroutine 16680 [IO wait]:

net.runtime_pollWait(0xcb1d878, 0x77, 0x0)

/usr/local/Cellar/go/1.2/libexec/src/pkg/runtime/netpoll.goc:116 +0x6a

net.(*pollDesc).Wait(0xc212a86ca0, 0x77, 0x55d0c0, 0x24)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:81 +0x34

net.(*pollDesc).WaitWrite(0xc212a86ca0, 0x24, 0x55d0c0)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_poll_runtime.go:90 +0x30

net.(*netFD).connect(0xc212a86c40, 0x0, 0x0, 0xb4c97e8, 0xc212a84500, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:86 +0x166

net.(*netFD).dial(0xc212a86c40, 0xb4c87d8, 0x0, 0xb4c87d8, 0xc212a878d0, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:121 +0x2fd

net.socket(0x2402c0, 0x3, 0x2, 0x1, 0x0, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/sock_posix.go:91 +0x40b

net.internetSocket(0x2402c0, 0x3, 0xb4c87d8, 0x0, 0xb4c87d8, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/ipsock_posix.go:136 +0x161

net.dialTCP(0x2402c0, 0x3, 0x0, 0xc212a878d0, 0x0, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/tcpsock_posix.go:155 +0xef

net.dialSingle(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:225 +0x3d8

net.func·015(0x0, 0x0, 0x0, 0x2402c0, 0x3, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:158 +0xde

net.dial(0x2402c0, 0x3, 0xb4c8748, 0xc212a878d0, 0xafbbcd8, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/fd_unix.go:40 +0x45

net.(*Dialer).Dial(0xafbbd78, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:165 +0x3e0

net.Dial(0x2402c0, 0x3, 0xc210d161e0, 0x15, 0x0, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/dial.go:138 +0x75

net/http.(*Transport).dial(0xc210057280, 0x2402c0, 0x3, 0xc210d161e0, 0x15, ...)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:401 +0xd4

net/http.(*Transport).dialConn(0xc210057280, 0xc2112efa80, 0x0, 0x0, 0x0)

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:444 +0x6e

net/http.func·014()

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:419 +0x3e

created by net/http.(*Transport).getConn

/usr/local/Cellar/go/1.2/libexec/src/pkg/net/http/transport.go:421 +0x11a

我正在具有16GB RAM和2.6GHz Intel Core i5处理器的Mac OSX 10.9.2笔记本电脑上运行此脚本。

我该怎么办才能用尽可能多的并发HTTP请求“淹没”我的笔记本电脑?

回答:

正如Rob Napier所建议的那样,您几乎可以肯定达到文件描述符限制。

改进的并发版本:

该程序创建了maxgoroutine

的工作程序池,该程序将请求从通道中拉出,处理并在响应通道上发送。请求由a排队dispatcher,goroutines

由a排队workerPoolworkers每次一次处理一个作业,直到请求通道为空,然后consumer处理响应通道,直到成功响应的数量等于请求的数量。

package main

import (

"flag"

"fmt"

"log"

"net/http"

"runtime"

"time"

)

var (

reqs int

max int

)

func init() {

flag.IntVar(&reqs, "reqs", 1000000, "Total requests")

flag.IntVar(&max, "concurrent", 200, "Maximum concurrent requests")

}

type Response struct {

*http.Response

err error

}

// Dispatcher

func dispatcher(reqChan chan *http.Request) {

defer close(reqChan)

for i := 0; i < reqs; i++ {

req, err := http.NewRequest("GET", "http://localhost/", nil)

if err != nil {

log.Println(err)

}

reqChan <- req

}

}

// Worker Pool

func workerPool(reqChan chan *http.Request, respChan chan Response) {

t := &http.Transport{}

for i := 0; i < max; i++ {

go worker(t, reqChan, respChan)

}

}

// Worker

func worker(t *http.Transport, reqChan chan *http.Request, respChan chan Response) {

for req := range reqChan {

resp, err := t.RoundTrip(req)

r := Response{resp, err}

respChan <- r

}

}

// Consumer

func consumer(respChan chan Response) (int64, int64) {

var (

conns int64

size int64

)

for conns < int64(reqs) {

select {

case r, ok := <-respChan:

if ok {

if r.err != nil {

log.Println(r.err)

} else {

size += r.ContentLength

if err := r.Body.Close(); err != nil {

log.Println(r.err)

}

}

conns++

}

}

}

return conns, size

}

func main() {

flag.Parse()

runtime.GOMAXPROCS(runtime.NumCPU())

reqChan := make(chan *http.Request)

respChan := make(chan Response)

start := time.Now()

go dispatcher(reqChan)

go workerPool(reqChan, respChan)

conns, size := consumer(respChan)

took := time.Since(start)

ns := took.Nanoseconds()

av := ns / conns

average, err := time.ParseDuration(fmt.Sprintf("%d", av) + "ns")

if err != nil {

log.Println(err)

}

fmt.Printf("Connections:\t%d\nConcurrent:\t%d\nTotal size:\t%d bytes\nTotal time:\t%s\nAverage time:\t%s\n", conns, max, size, took, average)

}

产生:

连接:1000000

并发:200

总大小:15000000字节

总时间:36m39.6778103s

平均时间:2.199677ms

警告:这 非常 迅速打系统资源限制。在我的笔记本电脑上,超过206位并发工作人员导致我的本地测试Web服务器崩溃!

操场

下面的程序使用缓冲区chan

bool作为信号量通道,它限制了并发请求的数量。您可以调整此数字以及请求的总数,以便对系统进行压力测试并确定最大值。

package main

import (

"fmt"

"net/http"

"runtime"

"time"

)

type Resp struct {

*http.Response

err error

}

func makeResponses(reqs int, rc chan Resp, sem chan bool) {

defer close(rc)

defer close(sem)

for reqs > 0 {

select {

case sem <- true:

req, _ := http.NewRequest("GET", "http://localhost/", nil)

transport := &http.Transport{}

resp, err := transport.RoundTrip(req)

r := Resp{resp, err}

rc <- r

reqs--

default:

<-sem

}

}

}

func getResponses(rc chan Resp) int {

conns := 0

for {

select {

case r, ok := <-rc:

if ok {

conns++

if r.err != nil {

fmt.Println(r.err)

} else {

// Do something with response

if err := r.Body.Close(); err != nil {

fmt.Println(r.err)

}

}

} else {

return conns

}

}

}

}

func main() {

reqs := 100000

maxConcurrent := 1000

runtime.GOMAXPROCS(runtime.NumCPU())

rc := make(chan Resp)

sem := make(chan bool, maxConcurrent)

start := time.Now()

go makeResponses(reqs, rc, sem)

conns := getResponses(rc)

end := time.Since(start)

fmt.Printf("Connections: %d\nTotal time: %s\n", conns, end)

}

这将打印如下内容:

连接数:100000

总时间:6m8.2554629s

该测试是在本地Web服务器上完成的,每个请求返回的总响应大小为85B,因此这不是实际的结果。另外,除了关闭它的主体之外,我不对响应进行任何处理。

在最多1000个并发请求中,我的笔记本电脑仅花了6分钟以上的时间就完成了100,000个请求,因此我估计一百万个将花费一个小时。调整maxConcurrent变量将帮助您获得系统的最佳性能。

以上是 如何有效地“最大化”并发HTTP请求? 的全部内容, 来源链接: utcz.com/qa/409433.html

回到顶部