Java Reactor Flux 并发生产问题
如下例子
static Mono<Dog> getDog(Integer id) { System.out.println(Thread.currentThread().getName() + " getDog() 等待 2 秒");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.fromCallable(() -> new Dog(id)).subscribeOn(Schedulers.boundedElastic());
}
static class Dog {
Integer id;
public Dog(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Dog{" +
"id=" + id +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
Integer[] ids = {1, 2, 3, 4};
Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))
.subscribeOn(Schedulers.boundedElastic())
.toStream().forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
}
结果
main getDog() 等待 2 秒main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: Dog{id=1}
main :: Dog{id=2}
main :: Dog{id=3}
main :: Dog{id=4}
在网上查询一下,有文章说可以通过 create
方式来解决,
List<Mono<Dog>> list = Arrays.asList(getDog(1), getDog(2), getDog(3)); Flux.create(sink -> {
for (Mono<Dog> dog : list) {
sink.next(dog);
}
}).subscribeOn(Schedulers.boundedElastic(), true).toStream()
.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
// output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable
这样并没有使用多线程生产。
也尝试用 zip
方法,也没有达到想要的效果。
Integer[] ids = {1, 2, 3, 4}; Flux.zip(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3])).subscribeOn(Schedulers.boundedElastic(), true).toStream()
.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
//output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: [Dog{id=1},Dog{id=2},Dog{id=3},Dog{id=4}]
想让 merge
方法里面的 getDog
方法通过多线程执行,最后 merge 一个集合,不知道有什么方式来解决, 请教一下,谢谢。
回答:
Flux .fromArray(ids)
.parallel()
.runOn(Schedulers.parallel())
.map(i->getDog(i))
.subscribe(d->System.out.println(d));
这样可以
已参与了 SegmentFault 思否社区 10 周年「问答」打卡 ,欢迎正在阅读的你也加入。
回答:
public Mono<Long> packageMono() { return Mono.just(System.currentTimeMillis())
// 延迟一秒模拟请求
.delayElement(Duration.of(1L, ChronoUnit.SECONDS));
}
@Test
public void testRandom() throws Exception {
final Tuple3<Long, Long, Long> test = Mono
.zip(packageMono(), packageMono(), packageMono())
.block();
System.out.println(test);
}
参考 https://stackoverflow.com/que...
回答:
经过这两天网上查询的资料,大致写出来,在这里记录一下。
static Random random = new Random(); static Mono<Dog> getDog(Integer id) {
return Mono.fromCallable(() -> {
System.out.println( Thread.currentThread().getName() + " getDog() 等待 2 秒");
try {
long time = random.nextInt(2000);
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Dog(id);
}).subscribeOn(Schedulers.boundedElastic()).timeout(Duration.of(2, ChronoUnit.SECONDS)).onErrorReturn(new Dog(100));
}
public static void main(String[] args) throws InterruptedException {
Integer[] ids = {1, 2, 3, 4};
Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))
.toStream().forEach(dog -> System.out.println(Thread.currentThread().getName() + " : " + dog));
}
运行结果如下:
boundedElastic-1 getDog() 等待 2 秒boundedElastic-4 getDog() 等待 2 秒
boundedElastic-3 getDog() 等待 2 秒
boundedElastic-2 getDog() 等待 2 秒
main : Dog{id=3}
main : Dog{id=4}
main : Dog{id=1}
main : Dog{id=2}
以上是 Java Reactor Flux 并发生产问题 的全部内容, 来源链接: utcz.com/p/944531.html