多线程实现发布订阅升级版遗留问题
1.两个消费者消费消息都到100了,但是下图中的日志未打印出来
这个问题看代码
public class ConsumerObjectOne implements Runnable {@Override
public void run() {
while (true) {
if (PudConThread.arrayBlockingQueue.size() > 0) {
if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");
return;
}
/**
* 获取最新的一条消息消费
*/
try {
//这个地方是关键
MessageVO messageVO = PudConThread.arrayBlockingQueue.take();
System.out.println("消费者1消费消息" + messageVO.toString());
PudConThread.hasConsumerTotal.getAndAdd(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
ArrayBlockingQueue 里面的take(),当队列里面的长度为空时,会进入await 状态,所以两个消费线程在消费掉最后一条时,队列是空队列,take()阻塞不能进行下次循环,消费结束消息不能打印
将消费线程中的,消费消息代码和判断消费消息数量代码位置对调一下就可以了
public class ConsumerObjectOne implements Runnable {@Override
public void run() {
while (true) {
if (PudConThread.arrayBlockingQueue.size() > 0) {
/**
* 获取最新的一条消息消费
*/
try {
MessageVO messageVO = PudConThread.arrayBlockingQueue.take();
System.out.println("消费者1消费消息" + messageVO.toString());
PudConThread.hasConsumerTotal.getAndAdd(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");
return;
}
}
}
}
}
在运行结果如下
不对调位置也可以如下改造,将take()换成poll()
public class ConsumerObjectOne implements Runnable {@Override
public void run() {
while (true) {
if (PudConThread.hasConsumerTotal.get() >= PudConThread.total) {
System.out.println("消费者1--消费已达上限停止消费");
return;
}
if (PudConThread.arrayBlockingQueue.size() > 0) {
/**
* 获取最新的一条消息消费
*/
MessageVO messageVO = PudConThread.arrayBlockingQueue.poll();
if (messageVO != null) {
System.out.println("消费者1消费消息" + messageVO.toString());
PudConThread.hasConsumerTotal.getAndAdd(1);
}
}
}
}
}
第2个问题 序号我们用的是AtomicInteger 但是每次都会出现两个为0的序号
分析:每次出现两个为0的序号是,两个生产者在设置序号的时候用的是 AtomicInteger的 get() 方法这个只是返回当前最新值,所以两个生产者并发去get 获取到了初始值0
代码改造如下
public class ProductObjectOne implements Runnable {@Override
public void run() {
while (true) {
if (PudConThread.hasProductTotal.get() >= PudConThread.total) {
System.out.println("产品已达上限,停止生产");
return;
}
MessageVO messageVO = new MessageVO(PudConThread.hasProductTotal.getAndAdd(1), UUID.randomUUID().toString(), "ProductObjectOne---this is pubsub test");
try {
PudConThread.arrayBlockingQueue.put(messageVO);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果如下,未出现重复的序号了
以上是 多线程实现发布订阅升级版遗留问题 的全部内容, 来源链接: utcz.com/z/516429.html