多线程实现发布订阅升级版遗留问题

编程

1.两个消费者消费消息都到100了,但是下图中的日志未打印出来

这个问题看代码

public class ConsumerObjectOne implements Runnable {

@Override

public void run() {

while (true) {

if (PudConThread.arrayBlockingQueue.size() > 0) {

if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {

System.out.println("消费者1--消费已达上限停止消费");

return;

}

/**

* 获取最新的一条消息消费

*/

try {

                    //这个地方是关键

MessageVO messageVO = PudConThread.arrayBlockingQueue.take();

System.out.println("消费者1消费消息" + messageVO.toString());

PudConThread.hasConsumerTotal.getAndAdd(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

ArrayBlockingQueue 里面的take(),当队列里面的长度为空时,会进入await 状态,所以两个消费线程在消费掉最后一条时,队列是空队列,take()阻塞不能进行下次循环,消费结束消息不能打印

将消费线程中的,消费消息代码和判断消费消息数量代码位置对调一下就可以了

public class ConsumerObjectOne implements Runnable {

@Override

public void run() {

while (true) {

if (PudConThread.arrayBlockingQueue.size() > 0) {

/**

* 获取最新的一条消息消费

*/

try {

MessageVO messageVO = PudConThread.arrayBlockingQueue.take();

System.out.println("消费者1消费消息" + messageVO.toString());

PudConThread.hasConsumerTotal.getAndAdd(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {

System.out.println("消费者1--消费已达上限停止消费");

return;

}

}

}

}

}

在运行结果如下

不对调位置也可以如下改造,将take()换成poll()

public class ConsumerObjectOne implements Runnable {

@Override

public void run() {

while (true) {

if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {

System.out.println("消费者1--消费已达上限停止消费");

return;

}

if (PudConThread.arrayBlockingQueue.size() > 0) {

/**

* 获取最新的一条消息消费

*/

MessageVO messageVO = PudConThread.arrayBlockingQueue.poll();

if (messageVO != null) {

System.out.println("消费者1消费消息" + messageVO.toString());

PudConThread.hasConsumerTotal.getAndAdd(1);

}

}

}

}

}

第2个问题  序号我们用的是AtomicInteger 但是每次都会出现两个为0的序号

分析:每次出现两个为0的序号是,两个生产者在设置序号的时候用的是 AtomicInteger的 get() 方法这个只是返回当前最新值,所以两个生产者并发去get 获取到了初始值0

代码改造如下

public class ProductObjectOne implements Runnable {

@Override

public void run() {

while (true) {

if (PudConThread.hasProductTotal.get() >= PudConThread.total) {

System.out.println("产品已达上限,停止生产");

return;

}

MessageVO messageVO = new MessageVO(PudConThread.hasProductTotal.getAndAdd(1), UUID.randomUUID().toString(), "ProductObjectOne---this is pubsub test");

try {

PudConThread.arrayBlockingQueue.put(messageVO);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

运行结果如下,未出现重复的序号了

以上是 多线程实现发布订阅升级版遗留问题 的全部内容, 来源链接: utcz.com/z/516429.html

回到顶部