异步httpclient(httpasyncclient)的使用与总结
1. 前言
应用层的网络模型有同步与异步。同步意味当前线程是阻塞的,只有本次请求完成后才能进行下一次请求;异步意味着所有的请求可以同时塞入缓冲区,不阻塞当前的线程;
httpclient在4.x之后开始提供基于nio的异步版本httpasyncclient,httpasyncclient借助了Java并发库和nio进行封装(虽说NIO是同步非阻塞IO,但是HttpAsyncClient提供了回调的机制,与netty类似,所以可以模拟类似于AIO的效果),其调用方式非常便捷,但是其中也有许多需要注意的地方。
2. pom文件
本文依赖4.1.2,当前最新的客户端版本是4.1.3maven repository 地址
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
3. 简单的实例
public class TestHttpClient {
public static void main(String[] args){
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(100);
connManager.setDefaultMaxPerRoute(100);
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
//构造请求
String url = "http://127.0.0.1:9200/_bulk";
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
try {
String a = "{ "index": { "_index": "test", "_type": "test"} }
" +
"{"name": "上海","age":33}
";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);
//start
client.start();
//异步请求
client.execute(httpPost, new Back());
while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Back implements FutureCallback<HttpResponse>{
private long start = System.currentTimeMillis();
Back(){
}
public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void failed(Exception e) {
System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
}
public void cancelled() {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
4. 几个重要的参数
4.1 TimeOut(3个)的设置
ConnectTimeout : 连接超时,连接建立时间,三次握手完成时间。
SocketTimeout : 请求超时,数据传输过程中数据包之间间隔的最大时间。
ConnectionRequestTimeout : 使用连接池来管理连接,从连接池获取连接的超时时间。
在实际项目开发过程中,这三个值可根据具体情况设置。
(1) 下面针对ConnectionRequestTimeout的情况进行分析
实验条件:设置连接池最大连接数为1,每一个异步请求从开始到回调的执行时间在100ms以上;
实验过程:连续发送2次请求
public class TestHttpClient {
public static void main(String[] args){
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(10)//设置为10ms
.build();
//配置io线程
IOReactorConfig ioReactorConfig = IOReactorConfig.custom().
setIoThreadCount(Runtime.getRuntime().availableProcessors())
.setSoKeepAlive(true)
.build();
//设置连接池大小
ConnectingIOReactor ioReactor=null;
try {
ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
} catch (IOReactorException e) {
e.printStackTrace();
}
PoolingNHttpClientConnectionManager connManager = new PoolingNHttpClientConnectionManager(ioReactor);
connManager.setMaxTotal(1);//最大连接数设置1
connManager.setDefaultMaxPerRoute(1);//per route最大连接数设置1
final CloseableHttpAsyncClient client = HttpAsyncClients.custom().
setConnectionManager(connManager)
.setDefaultRequestConfig(requestConfig)
.build();
//构造请求
String url = "http://127.0.0.1:9200/_bulk";
List<HttpPost> list = new ArrayList<HttpPost>();
for(int i=0;i<2;i++){
HttpPost httpPost = new HttpPost(url);
StringEntity entity = null;
try {
String a = "{ "index": { "_index": "test", "_type": "test"} }
" +
"{"name": "上海","age":33}
";
entity = new StringEntity(a);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
httpPost.setEntity(entity);
list.add(httpPost);
}
client.start();
for(int i=0;i<2;i++){
client.execute(list.get(i), new Back());
}
while(true){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Back implements FutureCallback<HttpResponse>{
private long start = System.currentTimeMillis();
Back(){
}
public void completed(HttpResponse httpResponse) {
try {
System.out.println("cost is:"+(System.currentTimeMillis()-start)+":"+EntityUtils.toString(httpResponse.getEntity()));
} catch (IOException e) {
e.printStackTrace();
}
}
public void failed(Exception e) {
e.printStackTrace();
System.err.println(" cost is:"+(System.currentTimeMillis()-start)+":"+e);
}
public void cancelled() {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
实验结果 :
第一次请求执行时间在200ms左右
第二请求回调直接抛出TimeOutException
java.util.concurrent.TimeoutException
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:364)
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:344)
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:318)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:303)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.releaseConnection(AbstractClientExchangeHandler.java:239)
at org.apache.http.impl.nio.client.MainClientExec.responseCompleted(MainClientExec.java:387)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:168)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
结果分析:由于连接池大小是1,第一次请求执行后连接被占用(时间在100ms),第二次请求在规定的时间内无法获取连接,于是直接连接获取的TimeOutException
(2) 修改ConnectionRequestTimeout
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(50000)
.setSocketTimeout(50000)
.setConnectionRequestTimeout(1000)//设置为1000ms
.build();
1
2
3
4
5
上述两次请求正常执行。
下面进一步看一下代码中抛异常的地方:
从上面的代码中可以看到如果要设置永不ConnectionRequestTimeout,只需要将ConnectionRequestTimeout设置为小于0即可,当然后这种设置一定要慎用, 如果处理不当,请求堆积会导致OOM。
4.2 连接池大小的设置
ConnTotal:连接池中最大连接数;
ConnPerRoute(1000):分配给同一个route(路由)最大的并发连接数,route为运行环境机器到目标机器的一条线路,举例来说,我们使用HttpClient的实现来分别请求 www.baidu.com 的资源和 www.bing.com 的资源那么他就会产生两个route;
对于上述的实验,在一定程度上可以通过增大最大连接数来解决ConnectionRequestTimeout的问题!
后续:本文重点在于使用,后续会对源码进行分析与解读
参考文档:
1.官网httpcomponents-asyncclient-4.1.x
————————————————
版权声明:本文为CSDN博主「Kevin.Yang」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ouyang111222/java/article/details/78884634
以上是 异步httpclient(httpasyncclient)的使用与总结 的全部内容, 来源链接: utcz.com/z/518403.html