Java并发编程:任务的取消和关闭

编程

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java 没有提供任何机制来安全的终止线程。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清除如何执行清除工作。

正题

在开始文章前,有几个问题需要思考一下:

  • 取消任务的方式由哪几种?
  • 中断的策略是什么?
  • 如何响应中断?

取消任务的方式有哪几种

取消任务的方式大体上有一下两种:

  • 设置取消标志位
  • 中断

设置取消标志位

设置某个“已请求取消”标志,而任务将定期地查看该标志。如果设置了这个标记,那么任务将提前结束。

public class PrimeGenerator implements Runnable {

private static ExecutorService exec = Executors.newCachedThreadPool();

private final List<BigInteger> primes = new ArrayList<BigInteger>();

private volatile boolean cancelled;

public void run() {

BigInteger p = BigInteger.ONE;

while (!cancelled) {

p = p.nextProbablePrime();

synchronized (this) {

primes.add(p);

}

}

}

public void cancel() {

cancelled = true;

}

public synchronized List<BigInteger> get() {

return new ArrayList<BigInteger>(primes);

}

static List<BigInteger> aSecondOfPrimes() throws InterruptedException {

PrimeGenerator generator = new PrimeGenerator();

exec.execute(generator);

try {

SECONDS.sleep(1);

} finally {

generator.cancel();

}

return generator.get();

}

}

上面代码使用了这项技术,其中的 PrimeGenerator 持续地枚举素数,知道它被取消。cancel 方法将设置 cancelled 标志,并且主循环在搜索下一个素数之前会首先检查这个标志(为了使这个过程能可靠的工作,标志 cancelled 必须为 volatile 类型)。

PrimeGenerator 使用了一种简单的取消策略:客户代码通过调用 cancel 来请求取消,PrimeGenerator 在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。

一个可取消的任务必须拥有取消策略,在这个策略中将详细地定义取消操作的“How”“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

中断

PrimeGenerator 中的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如 BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志位,因此永远不会结束。

接下来的代码说明了这个问题。生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put 方法也会阻塞。当生产者在 put 方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用 cancel 方法设置 cancelled 标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的 put 方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以 put 方法将一直保持阻塞状态)。

class BrokenPrimeProducer extends Thread {

private final BlockingQueue<BigInteger> queue;

private volatile boolean cancelled = false;

private volatile boolean needMoreStatus = false;

BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {

this.queue = queue;

}

public void run() {

try {

BigInteger p = BigInteger.ONE;

while (!cancelled) {

queue.put(p = p.nextProbablePrime());

}

} catch (InterruptedException consumed) {

}

}

public void cancel() {

cancelled = true;

}

public synchronized BlockingQueue<BigInteger> get() {

return queue;

}

static BlockingQueue<BigInteger> aSecondOfPrimes() throws InterruptedException {

BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(10);

BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);

producer.start();

try {

SECONDS.sleep(1);

} finally {

producer.cancel();

}

return producer.get();

}

}

BrokenPrimeProducer 说明了一些自定义的取消机制无法与可阻塞的库函数实现良好交互的原因。如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用许多库类中提供的中断支持。通常,中断是实现取消的最合理方式。

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

JavaAPI 或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个 boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为 true。在 Thread 中包含了中断线程以及查询线程中断状态的方法。。interrupt 方法能中断目标线程,而 isInterrupted 方法能返回目标线程的中断状态。静态的 interrupted 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

public class Thread{

// 中断线程

public void interrupt() { ... }

// 中断状态

public boolean isInterrupted() { ... }

// 清除中断状态

public static boolean interrupted(){ ... }

}

阻塞库方法,例如 Thread.sleepObject.wait 等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出 InterruptedException,表示阻塞操作由于中断而提前结束。JVM 并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得“有黏性”——如果不触发 InterruptedException,那么中断状态一直保持,直到明确地清除中断状态。

调用 interrupt 并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己(这些时刻也被称为取消点)。有些方法,例如 waitsleepjoin 等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已经被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计槽糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求做出响应。

在使用静态的 interrupted 时应该小心,因为它会清除当前线程的中断状态。如果调用 interrupted 时返回了 true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出 InterruptedException,或者通过再次调用 interrupt 来恢复中断状态。

public class PrimeProducer extends Thread {

private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {

this.queue = queue;

}

public void run() {

try {

BigInteger p = BigInteger.ONE;

while (!Thread.currentThread().isInterrupted()) {

queue.put(p = p.nextProbablePrime());

}

} catch (InterruptedException consumed) {

/* Allow thread to exit */

}

}

public void cancel() {

interrupt();

}

public synchronized BlockingQueue<BigInteger> get() {

return queue;

}

}

在上面代码中,在每次迭代循环中,有两个位置可以检测出中断:在阻塞的 put 方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的 put 方法,因此这里并不一定需要进行显式的检测,但执行检测却会使 PrimeProducer 对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。如果可中断的阻塞方法的调用频率并不高,不足以获得足够的响应性,那么显式的检测中断状态能起到一定的帮助作用。

中断策略是什么

正如任务中应该包含取消策略一样,线程同样应该包含中断策略中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多块的速度来响应中断。

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准终端策略的线程或线程池,只能用于能知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。

任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心的保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。

这就是为什么大多数可阻塞的库函数都只是抛出 InterruptedException 作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因为它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以取消进一步的操作。

当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出 InterruptedException 或者表示已经收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中心包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将 InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕获 InterruptedException 之后恢复中断状态:

Thread.currentThread().interrupt();

正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

如何响应中断

当调用可中断的阻塞库函数时,例如 Thread.sleepBlockingQueue.put 等,有两种使用策略可用来处理 InterruptedException

传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。

恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

传递 InterruptedException 与将 InterruptedException 添加到 throws 字句中一样容易,如下代码清单:

private BlockingQueue<BigInteger> queue;

...

public BigInteger getNextInteger() throws InterruptedException {

return queue.take();

}

如果不想或无法传递 InterruptedException(或许通过 Runnable 来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是再次调用 interrupt 来恢复中断状态。你不能屏蔽 InterruptedException,例如在 catch 块中捕获到异常却不做任何处理,除非在你的代码中实现了线程的中断策略。虽然 PrimeProducer 屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获 InterruptedException 时恢复状态。

public BigInteger getNextInteger(BlockingQueue<BigInteger> queue) {

boolean interrupted = false;

try {

while (true) {

try {

return queue.take();

} catch (InterruptedException e) {

interrupted = true;

// 重新尝试

}

}

} finally {

if(interrupted)

Thread.currentThread().interrupt();

}

}

如上代码,如果过早的设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已经被设置时会立即抛出 InterruptedException(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步指示(当访问这些信息时,要确保使用同步)。

参考地址

  • https://blog.csdn.net/dilixinxixitong2009/article/details/79752404

如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。

以上是 Java并发编程:任务的取消和关闭 的全部内容, 来源链接: utcz.com/z/515091.html

回到顶部