拦截 WebClient ,遇到 Only one connection receive subscriber allowed

拦截 WebClient 请求响应数据,遇到 Only one connection receive subscriber allowed。

@Test

public void testWebClient2() {

WebClient client = WebClient.builder()

.filter(logResponse()).build();

Mono<String> mono = client.get().uri("https://www.baidu.com").retrieve().bodyToMono(String.class);

System.out.println(mono.block());

}

private ExchangeFilterFunction logResponse() {

return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {

log.info("Response Status : " + clientResponse.statusCode());

Mono<String> body = clientResponse.bodyToMono(String.class);

body.subscribe(str -> System.out.println(str));

return Mono.just(clientResponse);

});

}

运行结果:

java.lang.IllegalStateException: Only one connection receive subscriber allowed.

at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)

Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

Error has been observed at the following site(s):

|_ checkpoint ⇢ Body from GET https://www.baidu.com [DefaultClientResponse]

Stack trace:

at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)

filter里面消费了response body, 业务代码就不能消费了,目的是收集 WebClient 的请求和响应数据,有什么方式解决吗?求解,谢谢。


回答:

@Test

public void testWeb() throws InterruptedException {

WebClient client = WebClient

.builder()

.filter(logResponse())

.build();

client

.get()

.uri("https://www.baidu.com")

.retrieve()

.bodyToMono(String.class)

.subscribe(s->System.out.println(s));

TimeUnit.SECONDS.sleep(2);

}

private ExchangeFilterFunction logResponse() {

return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {

return clientResponse

.bodyToMono(String.class)

.doOnNext(s -> System.out.println("aaa" + s))

.map(s -> ClientResponse.from(clientResponse).body(s).build())

;

});

}

你可以尝试下这个。不过这违背了流的初衷,属于非主流操作。


回答:


你消费了两次返回值


首先你可以考虑不再logResponse()上消费可以尝试是用Mono#doOnSuccess,而且你用 mono.block() 是同步的,那用Mono,Flux这种异步库其实有点画龙点睛了

 @Test

public void testWebClient2() {

WebClient client = WebClient.builder()

.filter(logResponse()).build();

Mono<String> mono = client

.get()

.uri("https://www.baidu.com")

.retrieve()

.bodyToMono(String.class)

.doOnSuccess(it -> {

// 一些其他操作

System.out.println(it);

});

mono.block();

// System.out.println(mono.block());

}

private ExchangeFilterFunction logResponse() {

return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {

log.info("Response Status : " + clientResponse.statusCode());

return Mono.just(clientResponse);

});

}

以上是 拦截 WebClient ,遇到 Only one connection receive subscriber allowed 的全部内容, 来源链接: utcz.com/p/944390.html

回到顶部