Java Reactor Flux 并发生产问题

如下例子

 static Mono<Dog> getDog(Integer id) {

System.out.println(Thread.currentThread().getName() + " getDog() 等待 2 秒");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return Mono.fromCallable(() -> new Dog(id)).subscribeOn(Schedulers.boundedElastic());

}

static class Dog {

Integer id;

public Dog(Integer id) {

this.id = id;

}

public Integer getId() {

return id;

}

@Override

public String toString() {

return "Dog{" +

"id=" + id +

'}';

}

}

public static void main(String[] args) throws InterruptedException {

Integer[] ids = {1, 2, 3, 4};

Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))

.subscribeOn(Schedulers.boundedElastic())

.toStream().forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));

}

结果

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main :: Dog{id=1}

main :: Dog{id=2}

main :: Dog{id=3}

main :: Dog{id=4}

在网上查询一下,有文章说可以通过 create 方式来解决,

List<Mono<Dog>> list = Arrays.asList(getDog(1), getDog(2), getDog(3));

Flux.create(sink -> {

for (Mono<Dog> dog : list) {

sink.next(dog);

}

}).subscribeOn(Schedulers.boundedElastic(), true).toStream()

.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));

// output:

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main :: MonoSubscribeOnCallable

main :: MonoSubscribeOnCallable

main :: MonoSubscribeOnCallable

这样并没有使用多线程生产。
也尝试用 zip方法,也没有达到想要的效果。

Integer[] ids = {1, 2, 3, 4};

Flux.zip(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3])).subscribeOn(Schedulers.boundedElastic(), true).toStream()

.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));

//output:

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main getDog() 等待 2 秒

main :: [Dog{id=1},Dog{id=2},Dog{id=3},Dog{id=4}]

想让 merge方法里面的 getDog 方法通过多线程执行,最后 merge 一个集合,不知道有什么方式来解决, 请教一下,谢谢。


回答:

Flux

.fromArray(ids)

.parallel()

.runOn(Schedulers.parallel())

.map(i->getDog(i))

.subscribe(d->System.out.println(d));

这样可以

已参与了 SegmentFault 思否社区 10 周年「问答」打卡 ,欢迎正在阅读的你也加入。


回答:

public Mono<Long> packageMono() {

return Mono.just(System.currentTimeMillis())

// 延迟一秒模拟请求

.delayElement(Duration.of(1L, ChronoUnit.SECONDS));

}

@Test

public void testRandom() throws Exception {

final Tuple3<Long, Long, Long> test = Mono

.zip(packageMono(), packageMono(), packageMono())

.block();

System.out.println(test);

}

参考 https://stackoverflow.com/que...


回答:

经过这两天网上查询的资料,大致写出来,在这里记录一下。

 static Random random = new Random();

static Mono<Dog> getDog(Integer id) {

return Mono.fromCallable(() -> {

System.out.println( Thread.currentThread().getName() + " getDog() 等待 2 秒");

try {

long time = random.nextInt(2000);

Thread.sleep(time);

} catch (InterruptedException e) {

e.printStackTrace();

}

return new Dog(id);

}).subscribeOn(Schedulers.boundedElastic()).timeout(Duration.of(2, ChronoUnit.SECONDS)).onErrorReturn(new Dog(100));

}

public static void main(String[] args) throws InterruptedException {

Integer[] ids = {1, 2, 3, 4};

Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))

.toStream().forEach(dog -> System.out.println(Thread.currentThread().getName() + " : " + dog));

}

运行结果如下:

boundedElastic-1 getDog() 等待 2 秒

boundedElastic-4 getDog() 等待 2 秒

boundedElastic-3 getDog() 等待 2 秒

boundedElastic-2 getDog() 等待 2 秒

main : Dog{id=3}

main : Dog{id=4}

main : Dog{id=1}

main : Dog{id=2}

以上是 Java Reactor Flux 并发生产问题 的全部内容, 来源链接: utcz.com/p/944531.html

回到顶部