阻塞队列和多线程使用者,如何知道何时停止

我有一个单一的线程生产者,它创建了一些任务对象,然后将它们添加到一个ArrayBlockingQueue(大小固定的)对象中。

我还启动了一个多线程使用者。这是作为固定线程池(Executors.newFixedThreadPool(threadCount);)构建的。然后,我向该threadPool提交一些ConsumerWorker实例,每个ConsumerWorker都引用了上述ArrayBlockingQueue实例。

每个这样的Worker将take()在队列中执行并处理任务。

我的问题是,什么时候不再需要其他工作,让工人知道的最佳方法是什么。换句话说,我如何告诉工人生产者已经完成添加到队列,从这一点开始,每个工人在看到队列为空时都应该停止。

我现在得到的是一个设置,其中,生产者使用一个回调初始化,当他完成工作时(将内容添加到队列中),该回调将被触发。我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表。当生产者回叫告诉我生产者完成时,我可以告诉每个工人。在这一点上,他们应该继续检查队列是否不为空,当队列为空时,它们应该停止,从而使我能够正常关闭ExecutorService线程池。是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {

this.inputQueue = inputQueue;

}

@Override

public void run() {

//worker loop keeps taking en element from the queue as long as the producer is still running or as

//long as the queue is not empty:

while(isRunning || !inputQueue.isEmpty()) {

System.out.println("Consumer "+Thread.currentThread().getName()+" START");

try {

Object queueElement = inputQueue.take();

//process queueElement

} catch (Exception e) {

e.printStackTrace();

}

}

}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue

public void setRunning(boolean isRunning) {

this.isRunning = isRunning;

}

}

这里的问题是我有一个明显的竞争条件,有时生产者会完成,发信号,而ConsumerWorkers会在消耗队列中的所有内容之前停止。

我的问题是什么是最好的同步方式,以便一切正常?我是否应该同步检查生产者是否正在运行的整个部分,以及队列是否为空,并在一个块中(在队列对象上)从队列中取出内容?我是否应该仅isRunning在ConsumerWorker实例上同步布尔值的更新?还有其他建议吗?

更新,这里是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1);

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {

this.inputQueue = inputQueue;

}

@Override

public void run() {

//worker loop keeps taking en element from the queue as long as the producer is still running or as

//long as the queue is not empty:

while(true) {

System.out.println("Consumer "+Thread.currentThread().getName()+" START");

try {

Produced queueElement = inputQueue.take();

Thread.sleep(new Random().nextInt(100));

if(queueElement==POISON) {

break;

}

//process queueElement

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("Consumer "+Thread.currentThread().getName()+" END");

}

}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue

public void stopRunning() {

try {

inputQueue.put(POISON);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

这主要是受到JohnVint的以下回答的启发,仅作了一些小改动。

===由于@vendhan的评论而进行了更新。

谢谢您的观察。没错,这个问题的第一个代码片段(以及其他问题)while(isRunning ||

!inputQueue.isEmpty())确实没有什么意义。

在此的最终实际实现中,我所做的事情与您建议替换“ ||”更接近 (或)和“

&&”(和),从某种意义上说,每个工人(消费者)现在仅检查从列表中得到的元素是否是毒药,如果停止,则停止(因此理论上我们可以说该工人有且队列不能为空)。

回答:

您应该take()从队列继续。您可以使用毒药告诉工人停下来。例如:

private final Object POISON_PILL = new Object();

@Override

public void run() {

//worker loop keeps taking en element from the queue as long as the producer is still running or as

//long as the queue is not empty:

while(isRunning) {

System.out.println("Consumer "+Thread.currentThread().getName()+" START");

try {

Object queueElement = inputQueue.take();

if(queueElement == POISON_PILL) {

inputQueue.add(POISON_PILL);//notify other threads to stop

return;

}

//process queueElement

} catch (Exception e) {

e.printStackTrace();

}

}

}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue

public void finish() {

//you can also clear here if you wanted

isRunning = false;

inputQueue.add(POISON_PILL);

}

以上是 阻塞队列和多线程使用者,如何知道何时停止 的全部内容, 来源链接: utcz.com/qa/401460.html

回到顶部