Java的Future接口

java

Java的Future接口

Java的Future接口

Java 中的 Future 接口和其实现类 FutureTask,代表了异步计算的结果。

1. Future接口简介

Future 是异步计算结果的容器接口,它提供了下面这些功能:

  • 在等待异步计算完成时检查计算结果是否完成
  • 在异步计算完成后获取计算结果
  • 在异步计算完成前取消

Future 可以用于耗时的异步计算任务。例如我们把 Runnable 接口或 Callable 接口的实现类提交到线程池时,线程池会返回一个 FutureTask 对象。

<T> Future<T> submit(Callable<T> task)

<T> Future<T> submit(Runnable<T> task, T result)

下文会再解释 FutureTask,这是 Future 接口的一个实现类。

Future 接口提供了下面这些方法

Modifier and TypeMethodDescription
booleancancel(boolean mayInterruptIfRunning)尝试取消执行此任务。
Vget()等待计算完成,然后检索其结果。
Vget(long timeout, TimeUnit unit)如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。
booleanisCancelled()如果此任务在正常完成之前被取消,则返回 true
booleanisDone()返回 true如果任务已完成。

2. FutureTask的使用

可以将 FutureTask 交给 Executor 执行,也可以通过ExecutorService.submit(...)方法返回一个 FutureTask,然后执行 get 方法或 cancel 方法。

也可以单独使用 FutureTask,比如下面的代码就实现了一种需求:一个线程必须等待另一个线程把某个任务执行完后它才能继续执行。假设有多个线程执行若干个任务,每个任务最多只能同时被执行一次,多个线程试图执行同一个任务时,只允许一个线程执行任务,其他线程等待这个任务执行完后才能继续执行。

public class ConcurrentTask {

private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<Object, Future<String>>();

private String executionTask(final String taskName) throws ExecutionException, InterruptedException {

while (true) {

Future<String> future = taskCache.get(taskName); //1.1,2.1

if (future == null) {

// 创建 Task

Callable<String> task = new Callable<String>() {

public String call() throws InterruptedException {

//......

return taskName;

}

};

//1.2 创建 FutureTask

FutureTask<String> futureTask = new FutureTask<>(task);

future = taskCache.putIfAbsent(taskName, futureTask); //1.3

// 如果是第一次放入,则尝试执行

if (future == null) {

future = futureTask;

futureTask.run(); //1.4执行任务

}

}

try {

return future.get(); //1.5,2.2线程在此等待任务执行完成

} catch (CancellationException e) {

taskCache.remove(taskName, future);

}

}

}

}

相信不难理解,下面是执行的示意图。

3. FutureTask的实现

FutureTask 的实现基于队列同步器 QAS。

基于复合优先于继承的原则,FutureTask 声明了一个内部私有的,继承于 AQS 的子类 Sync,这对 FutureTask 所有公有方法的调用都会委托给这个内部子类。

FutureTask 的get方法会调用AQS.acquireSharedInterruptibly(int arg)方法,执行过程如下:

  1. 调用AQS.acquireSharedInterruptibly(int arg)方法,首先回调子类 Sync 中的方法tryAcqurieShared判断acquire操作是否可以成功。acquire操作成功的条件为:state 为执行完成状态 RAN 或已取消状态 CANCELLED 且 runner 不为 null。
  2. 如果成功则get方法立即返回,失败则到线程等待队列中去等待其他线程执行release
  3. 当其他线程执行release,如FutureTask.run()FutureTask.cancel(),唤醒当前线程后。当前线程再次执行tryAcquireShared将返回值 1,当前线程离开等待队列并唤醒后续线程。
  4. 最后返回结果或抛出异常。

FutureTask 的 run 方法执行过程如下:

  1. 执行构造函数中指定的任务。
  2. 原子方式更新同步状态,调用AQS.compareAndSetState

  3. 如果上面的原子操作成功,设置代表计算结果的变量 result 的值为Callable.call()的返回指,然后调用AQS.releaseShared(int arg)

  4. AQS.releaseShared(int arg)首先回调 Sync 中的tryReleaseShared(arg)来执行release。这个方法唤醒等待队列中第一个线程。

  5. 调用FutureTask.done()

当调用FutureTask.get()方法时,如果 FutureTask 不是处于执行完成状态 RAN 或已取消状态 CANCELLED。当前执行线程将到 AQS 的线程等待队列中等待。

以上是 Java的Future接口 的全部内容, 来源链接: utcz.com/z/392223.html

回到顶部