我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗?

一个 线程死锁饥饿 如果池中的所有线程都在等待在同一池中,以完成队列任务发生在一个正常的线程池。

ForkJoinPool通过从join()调用内部的其他线程中窃取工作来避免此问题,而不仅仅是等待。例如:

private static class ForkableTask extends RecursiveTask<Integer> {

private final CyclicBarrier barrier;

ForkableTask(CyclicBarrier barrier) {

this.barrier = barrier;

}

@Override

protected Integer compute() {

try {

barrier.await();

return 1;

} catch (InterruptedException | BrokenBarrierException e) {

throw new RuntimeException(e);

}

}

}

@Test

public void testForkJoinPool() throws Exception {

final int parallelism = 4;

final ForkJoinPool pool = new ForkJoinPool(parallelism);

final CyclicBarrier barrier = new CyclicBarrier(parallelism);

final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);

for (int i = 0; i < parallelism; ++i) {

forkableTasks.add(new ForkableTask(barrier));

}

int result = pool.invoke(new RecursiveTask<Integer>() {

@Override

protected Integer compute() {

for (ForkableTask task : forkableTasks) {

task.fork();

}

int result = 0;

for (ForkableTask task : forkableTasks) {

result += task.join();

}

return result;

}

});

assertThat(result, equalTo(parallelism));

}

但是,使用到的ExecutorService接口时ForkJoinPool,似乎不会发生窃取工作的情况。例如:

private static class CallableTask implements Callable<Integer> {

private final CyclicBarrier barrier;

CallableTask(CyclicBarrier barrier) {

this.barrier = barrier;

}

@Override

public Integer call() throws Exception {

barrier.await();

return 1;

}

}

@Test

public void testWorkStealing() throws Exception {

final int parallelism = 4;

final ExecutorService pool = new ForkJoinPool(parallelism);

final CyclicBarrier barrier = new CyclicBarrier(parallelism);

final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));

int result = pool.submit(new Callable<Integer>() {

@Override

public Integer call() throws Exception {

int result = 0;

// Deadlock in invokeAll(), rather than stealing work

for (Future<Integer> future : pool.invokeAll(callableTasks)) {

result += future.get();

}

return result;

}

}).get();

assertThat(result, equalTo(parallelism));

}

粗略地看一下ForkJoinPool的实现,所有常规ExecutorServiceAPI都是使用ForkJoinTasks

实现的,因此我不确定为什么会发生死锁。

回答:

您几乎要回答自己的问题。解决方案是声明“

ForkJoinPool通过从join()调用内部的其他线程中窃取工作来避免此问题”。只要线程由于某些其他原因而被阻塞(除外)ForkJoinPool.join(),就不会发生窃取工作的情况,线程只是等待而什么也不做。

原因是在Java中,无法ForkJoinPool阻止其线程阻塞,而是给它们提供其他处理方法。线程 本身

需要避免阻塞,而是要求池进行应做的工作。并且这仅在该ForkJoinTask.join()方法中实现,而不能在任何其他阻塞方法中实现。如果在Future内部使用ForkJoinPool,您还将看到饥饿死锁。

为什么仅ForkJoinTask.join()在Java

API中的任何其他阻止方法中而不是在其他方法中实现工作窃取?嗯,有很多这样的阻塞方法(Object.wait()Future.get()中的任何并发原语java.util.concurrent,I

/

O方法等),它们与无关ForkJoinPool,后者只是API中的任意类,因此在所有情况下都添加了特殊情况这些方法将是不好的设计。这也可能导致非常令人惊讶和不希望的后果。假设有一个用户将一个任务传递给一个ExecutorService等待一个的Future,然后发现该任务挂起的时间很长,Future.get()原因是正在运行的线程偷走了其他一些(长时间运行的)工作项,而不是等待Future并在结果可用后立即继续。一旦线程开始处理另一个任务,它就不能返回到原始任务,直到第二个任务完成为止。因此,其他阻止方法不进行工作窃取实际上是一件好事。对于a

ForkJoinTask,这个问题不存在,因为不重要的是尽快继续执行主要任务,因此重要的是尽可能高效地共同处理所有任务。

也无法实现您自己的方法来在内进行偷窃工作ForkJoinPool,因为所有相关部分都不公开。

但是,实际上还有第二种方法可以防止饥饿死锁。这称为

。它不使用工作窃取(以避免上面提到的问题),但是还需要将要阻塞的线程与线程池进行积极协作。使用托管阻塞,线程告诉线程池它可能在被阻塞 之前

它调用潜在的阻塞方法,并在阻塞方法完成时通知池。然后,线程池知道存在饥饿死锁的风险,并且如果其所有线程当前都处于某个阻塞操作中并且还有其他任务要执行,则可能会产生其他线程。请注意,由于附加线程的开销,这效率不如工作窃取。如果您使用普通期货和托管分块来实现递归并行算法,而不是使用ForkJoinTask加上工作偷窃,额外线程的数量可能会非常大(因为在算法的“划分”阶段,将创建许多任务并将其分配给立即阻塞并等待子任务结果的线程)。但是,仍然可以防止出现饥饿死锁,并且避免了一个任务必须等待很长时间的问题,因为该任务的线程同时开始处理另一个任务。

ForkJoinPoolJava的支持托管阻塞。为了使用它,需要实现一个接口ForkJoinPool.ManagedBlocker,以便从block该接口的方法中调用任务要执行的潜在阻塞方法。然后,任务可能不会直接调用阻塞方法,而是需要调用static方法ForkJoinPool.managedBlock(ManagedBlocker)。此方法在阻塞之前和之后处理与线程池的通信。如果当前任务未在内执行ForkJoinPool,那么它也可以工作,它仅调用阻塞方法。

我在Java API(适用于Java 7)中找到的唯一实际使用托管阻塞的地方是class

Phaser。(此类是像互斥锁和闩锁一样的同步屏障,但更灵活,更强大。)因此,与任务Phaser内部同步ForkJoinPool时应使用托管阻塞,并且可以避免饥饿死锁(但ForkJoinTask.join()仍可取,因为它使用工作窃取而不是托管阻塞)

。无论您是ForkJoinPool直接使用还是通过其ExecutorService界面使用,此方法都有效。但是,如果您使用其他任何ExecutorService类(例如由类创建的类)Executors,则将无法使用,因为它们不支持托管阻塞。

在Scala中,托管阻塞的使用更加广泛(description,API)。

以上是 我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗? 的全部内容, 来源链接: utcz.com/qa/422758.html

回到顶部