Java 8并行流中的自定义线程池

是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。

假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。

如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。

请尝试以下示例。在单独的线程中执行一些CPU密集型任务。任务利用并行流。第一项任务已中断,因此每个步骤需要1秒钟(由线程睡眠模拟)。问题是其他线程被卡住并等待中断的任务完成。这是一个人为的示例,但请想象一个servlet应用程序,有人向共享的fork联接池提交了长时间运行的任务。

public class ParallelTest {

public static void main(String[] args) throws InterruptedException {

ExecutorService es = Executors.newCachedThreadPool();

es.execute(() -> runTask(1000)); //incorrect task

es.execute(() -> runTask(0));

es.execute(() -> runTask(0));

es.execute(() -> runTask(0));

es.execute(() -> runTask(0));

es.execute(() -> runTask(0));

es.shutdown();

es.awaitTermination(60, TimeUnit.SECONDS);

}

private static void runTask(int delay) {

range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()

.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));

}

public static boolean isPrime(long n) {

return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);

}

}

回答:

实际上有一个技巧,如何在特定的fork-join池中执行并行操作。如果你将它作为一个任务在fork-join池中执行,它将停留在该位置并且不使用公共池。

final int parallelism = 4;

ForkJoinPool forkJoinPool = null;

try {

forkJoinPool = new ForkJoinPool(parallelism);

final List<Integer> primes = forkJoinPool.submit(() ->

// Parallel task here, for example

IntStream.range(1, 1_000_000).parallel()

.filter(PrimesPrint::isPrime)

.boxed().collect(Collectors.toList())

).get();

System.out.println(primes);

} catch (InterruptedException | ExecutionException e) {

throw new RuntimeException(e);

} finally {

if (forkJoinPool != null) {

forkJoinPool.shutdown();

}

}

该技巧基于ForkJoinTask.fork,它指定:“安排在当前任务正在运行的池中异步执行此任务,如果适用,或者如果不包含inForkJoinPool(),则使用ForkJoinPool.commonPool()”。

以上是 Java 8并行流中的自定义线程池 的全部内容, 来源链接: utcz.com/qa/430594.html

回到顶部