Reactor异步线程的变量传递

编程

threadlocal的问题

在传统的请求/应答同步模式中,使用threadlocal来传递上下文变量是非常方便的,可以省得在每个方法参数添加公用的变量,比如当前登录用户。但是业务方法可能使用了async或者在其他线程池中异步执行,这个时候threadlocal的作用就失效了。

这个时候的解决办法就是采取propagation模式,即在同步线程与异步线程衔接处传播这个变量。

TaskDecorator

比如spring就提供了TaskDecorator,通过实现这个接口,可以自己控制传播那些变量。例如:

classMdcTaskDecoratorimplementsTaskDecorator{

@Override

public Runnable decorate(Runnable runnable){

// Right now: Web thread context !

// (Grab the current thread MDC data)

Map<String, String> contextMap = MDC.getCopyOfContextMap();

return()->{

try{

// Right now: @Async thread context !

// (Restore the Web thread context"s MDC data)

MDC.setContextMap(contextMap);

runnable.run();

}finally{

MDC.clear();

}

};

}

}

这里注意在finally里头clear

配置这个taskDecorator

@EnableAsync

@Configuration

publicclassAsyncConfigimplementsAsyncConfigurer{

@Override

publicExecutorgetAsyncExecutor(){

ThreadPoolTaskExecutor executor =newThreadPoolTaskExecutor();

executor.setTaskDecorator(newMdcTaskDecorator());

executor.initialize();

return executor;

}

}

完整实例详见Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads

Reactor Context

spring5引入webflux,其底层是基于reactor,那么reactor如何进行上下文变量的传播呢?官方提供了Context对象来替代threadlocal。

其特性如下:

  • 类似map的kv操作,比如put(Object key, Object value),putAll(Context), hasKey(Object key)
  • immutable,即同一个key,后面put不会覆盖
  • 提供getOrDefault,getOrEmpty方法
  • Context与作用链上的每个Subscriber绑定
  • 通过subscriberContext(Context)来访问
  • Context的作用是自底向上

实例

设置及读取

@Test

public voidtestSubscriberContext(){

String key ="message";

Mono<String> r = Mono.just("Hello")

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key)))

.subscriberContext(ctx -> ctx.put(key,"World"));

StepVerifier.create(r)

.expectNext("Hello World")

.verifyComplete();

}

这里从最底部的subscriberContext设置message值为World,然后flatMap里头通过subscriberContext来访问。

自底向上

@Test

public void testContextSequence(){

String key ="message";

Mono<String> r = Mono.just("Hello")

//NOTE 这个subscriberContext设置的太高了

.subscriberContext(ctx -> ctx.put(key,"World"))

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.getOrDefault(key,"Stranger")));

StepVerifier.create(r)

.expectNext("Hello Stranger")

.verifyComplete();

}

由于这个例子的subscriberContext设置的太高了,不能作用在flatMap里头的Mono.subscriberContext()

不可变

    @Test

publicvoidtestContextImmutable(){

String key ="message";

Mono<String> r = Mono.subscriberContext()

.map( ctx -> ctx.put(key,"Hello"))

//这里返回了一个新的,因此上面的设置失效了

.flatMap( ctx -> Mono.subscriberContext())

.map( ctx -> ctx.getOrDefault(key,"Default"));

StepVerifier.create(r)

.expectNext("Default")

.verifyComplete();

}

subscriberContext永远返回一个新的

多个连续的subscriberContext

@Test

public void testReadOrder(){

String key ="message";

Mono<String> r = Mono.just("Hello")

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key)))

.subscriberContext(ctx -> ctx.put(key,"Reactor"))

.subscriberContext(ctx -> ctx.put(key,"World"));

StepVerifier.create(r)

.expectNext("Hello Reactor")

.verifyComplete();

}

operator只会读取离它最近的一个context

flatMap间的subscriberContext

@Test

public void testContextBetweenFlatMap(){

String key ="message";

Mono<String> r = Mono.just("Hello")

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key)))

.subscriberContext(ctx -> ctx.put(key,"Reactor"))

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key)))

.subscriberContext(ctx -> ctx.put(key,"World"));

StepVerifier.create(r)

.expectNext("Hello Reactor World")

.verifyComplete();

}

flatMap读取离它最近的context

flatMap中的subscriberContext

@Test

public void testContextInFlatMap(){

String key ="message";

Mono<String> r =

Mono.just("Hello")

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key))

)

.flatMap( s -> Mono.subscriberContext()

.map( ctx -> s +" "+ ctx.get(key))

.subscriberContext(ctx -> ctx.put(key,"Reactor"))

)

.subscriberContext(ctx -> ctx.put(key,"World"));

StepVerifier.create(r)

.expectNext("Hello World Reactor")

.verifyComplete();

}

这里第一个flatMap无法读取第二个flatMap内部的context

小结

reactor通过提供Context来实现了类似同步线程threadlocal的功能,非常强大,值得好好琢磨。

doc

  • TaskDecorator
  • Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
  • HOW TO PASS CONTEXT IN STANDARD WAY - WITHOUT THREADLOCAL
  • Spring Security Context Propagation with @Async
  • 如何在async线程中访问RequestContextHolder
  • Context Aware Java Executor and Spring"s @Async
  • 8.8.1. The Context API

以上是 Reactor异步线程的变量传递 的全部内容, 来源链接: utcz.com/z/513717.html

回到顶部