常用简单消费队列实现
1.场景
很多情况下要用到生产者-消费者这个模型,比如有很多任务要处理,如日志收集、邮件发送等,需采用异步进行处理。但如果生产速度大于消费速度,这个时候就需要队列的支持 和多线程处理。 如果涉及到分布式、消息持久化不在本文讨论方面,建议采用Redis中间件来实现。
2. 实现
a. 消息队列
public class MessageQueue { // 队列大小
static final int QUEUE_MAX_SIZE = 1000;
static BlockingQueue<AbstractMessageObj> blockingQueue = new LinkedBlockingQueue<AbstractMessageObj>(QUEUE_MAX_SIZE);
/**
* 私有的默认构造子,保证外界无法直接实例化
*/
private MessageQueue() {
}
/**
* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载
*/
private static class SingletonHolder {
/**
* 静态初始化器,由JVM来保证线程安全
*/
private static MessageQueue queue = new MessageQueue();
}
// 单例队列
public static MessageQueue getMessageQueue() {
return SingletonHolder.queue;
}
/**
* 生产入队
* @throws InterruptedException
* add(e) 队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue
* put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。
* offer(e) 队列未满时,返回true;队列满时返回false。非阻塞立即返回。
* offer(e, time, unit) 设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。
*/
public boolean produce(AbstractMessageObj mail) throws InterruptedException {
return blockingQueue.offer(mail);
}
/**
* 消费出队
* poll() 获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null
* take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒
*/
public AbstractMessageObj consume() throws InterruptedException {
return blockingQueue.take();
}
// 获取队列大小
public int size() {
return blockingQueue.size();
}
}
b.异常消息处理,基于多线程
public abstract class AbstractConsumeMessageQueue { private static final Logger logger = LoggerFactory.getLogger(AbstractConsumeMessageQueue.class);
private ExecutorService executorService = Executors.newFixedThreadPool(2,new ThreadFactory() {
int count=1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "consume_message_pool_" + (count++));
}
});// 两个大小的固定线程池
/**
* 抽象类实现消息处理逻辑
* @param messageObj
*/
protected abstract void handleMessage(AbstractMessageObj messageObj);
@PostConstruct
public void init() {
executorService.submit(new HandleMessageThread());
executorService.submit(new HandleMessageThread());
}
class HandleMessageThread implements Runnable {
@Override
public void run() {
while (true) {
try {
AbstractMessageObj obj = MessageQueue.getMessageQueue().consume();
if (obj != null) {
logger.info("剩余消息总数:{}", MessageQueue.getMessageQueue().size());
handleMessage(obj);
}
} catch (Exception e) {
logger.error("处理消息出错",e);
}
}
}
}
@PreDestroy
public void destory() {
logger.info("destroy");
executorService.shutdown();
}
}
以上是 常用简单消费队列实现 的全部内容, 来源链接: utcz.com/z/517186.html