Java执行程序:如何在任务完成时无阻碍地得到通知?

假设我有一个充满任务的队列,需要将其提交给执行者服务。我希望他们一次处理一个。我能想到的最简单的方法是:

  1. 从队列中接任务
  2. 提交给执行者
  3. 在返回的Future上调用.get并阻塞,直到获得结果为止
  4. 从队列中执行另一个任务…

    但是,我试图避免完全阻止。如果我有10,000个这样的队列,需要一次处理一个任务,那么我的堆栈空间将用完,因为它们中的大多数都将保留阻塞的线程。

我想要提交一个任务并提供一个回叫,当任务完成时会调用该回叫。我将使用该回调通知作为发送下一个任务的标志。(functionaljava和jetlang显然使用了这种非阻塞算法,但我无法理解它们的代码)

如果不编写自己的执行程序服务,如何使用JDK的java.util.concurrent做到这一点?

(向我提供这些任务的队列本身可能会阻塞,但这是稍后要解决的问题)

回答:

定义一个回调接口,以接收要在完成通知中传递的任何参数。然后在任务结束时调用它。

你甚至可以为Runnable任务编写通用包装,并将其提交给ExecutorService。或者,请参阅下面的Java 8中内置的机制。

class CallbackTask implements Runnable {

private final Runnable task;

private final Callback callback;

CallbackTask(Runnable task, Callback callback) {

this.task = task;

this.callback = callback;

}

public void run() {

task.run();

callback.complete();

}

}

使用CompletableFutureJava 8,Java 8包含了一种更精细的方法来组成管道,在该管道中可以异步和有条件地完成进程。这是一个人为但完整的通知示例。

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

public static void main(String... argv) throws Exception {

ExampleService svc = new ExampleService();

GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();

CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);

f.thenAccept(listener::notify);

System.out.println("Exiting main()");

}

void notify(String msg) {

System.out.println("Received message: " + msg);

}

}

class ExampleService {

String work() {

sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */

char[] str = new char[5];

ThreadLocalRandom current = ThreadLocalRandom.current();

for (int idx = 0; idx < str.length; ++idx)

str[idx] = (char) ('A' + current.nextInt(26));

String msg = new String(str);

System.out.println("Generated message: " + msg);

return msg;

}

public static void sleep(long average, TimeUnit unit) {

String name = Thread.currentThread().getName();

long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));

System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);

try {

unit.sleep(timeout);

System.out.println(name + " awoke.");

} catch (InterruptedException abort) {

Thread.currentThread().interrupt();

System.out.println(name + " interrupted.");

}

}

public static long exponential(long avg) {

return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));

}

}

以上是 Java执行程序:如何在任务完成时无阻碍地得到通知? 的全部内容, 来源链接: utcz.com/qa/422938.html

回到顶部