拦截 WebClient ,遇到 Only one connection receive subscriber allowed
拦截 WebClient 请求响应数据,遇到 Only one connection receive subscriber allowed。
@Test public void testWebClient2() {
WebClient client = WebClient.builder()
.filter(logResponse()).build();
Mono<String> mono = client.get().uri("https://www.baidu.com").retrieve().bodyToMono(String.class);
System.out.println(mono.block());
}
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
log.info("Response Status : " + clientResponse.statusCode());
Mono<String> body = clientResponse.bodyToMono(String.class);
body.subscribe(str -> System.out.println(str));
return Mono.just(clientResponse);
});
}
运行结果:
java.lang.IllegalStateException: Only one connection receive subscriber allowed. at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Body from GET https://www.baidu.com [DefaultClientResponse]
Stack trace:
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)
filter里面消费了response body, 业务代码就不能消费了,目的是收集 WebClient 的请求和响应数据,有什么方式解决吗?求解,谢谢。
回答:
@Test
public void testWeb() throws InterruptedException {
WebClient client = WebClient
.builder()
.filter(logResponse())
.build();
client
.get()
.uri("https://www.baidu.com")
.retrieve()
.bodyToMono(String.class)
.subscribe(s->System.out.println(s));
TimeUnit.SECONDS.sleep(2);
}
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
return clientResponse
.bodyToMono(String.class)
.doOnNext(s -> System.out.println("aaa" + s))
.map(s -> ClientResponse.from(clientResponse).body(s).build())
;
});
}
你可以尝试下这个。不过这违背了流的初衷,属于非主流操作。
回答:
你消费了两次返回值
首先你可以考虑不再logResponse()上消费可以尝试是用Mono#doOnSuccess,而且你用 mono.block() 是同步的,那用Mono,Flux这种异步库其实有点画龙点睛了
@Test public void testWebClient2() {
WebClient client = WebClient.builder()
.filter(logResponse()).build();
Mono<String> mono = client
.get()
.uri("https://www.baidu.com")
.retrieve()
.bodyToMono(String.class)
.doOnSuccess(it -> {
// 一些其他操作
System.out.println(it);
});
mono.block();
// System.out.println(mono.block());
}
private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
log.info("Response Status : " + clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
以上是 拦截 WebClient ,遇到 Only one connection receive subscriber allowed 的全部内容, 来源链接: utcz.com/p/944390.html