常用简单消费队列实现

编程

1.场景

很多情况下要用到生产者-消费者这个模型,比如有很多任务要处理,如日志收集、邮件发送等,需采用异步进行处理。但如果生产速度大于消费速度,这个时候就需要队列的支持 和多线程处理。 如果涉及到分布式、消息持久化不在本文讨论方面,建议采用Redis中间件来实现。

2. 实现

a. 消息队列

public class MessageQueue {

// 队列大小

static final int QUEUE_MAX_SIZE = 1000;

static BlockingQueue<AbstractMessageObj> blockingQueue = new LinkedBlockingQueue<AbstractMessageObj>(QUEUE_MAX_SIZE);

/**

* 私有的默认构造子,保证外界无法直接实例化

*/

private MessageQueue() {

}

/**

* 类级的内部类,也就是静态的成员式内部类,该内部类的实例与外部类的实例 没有绑定关系,而且只有被调用到才会装载,从而实现了延迟加载

*/

private static class SingletonHolder {

/**

* 静态初始化器,由JVM来保证线程安全

*/

private static MessageQueue queue = new MessageQueue();

}

// 单例队列

public static MessageQueue getMessageQueue() {

return SingletonHolder.queue;

}

/**

* 生产入队

* @throws InterruptedException

* add(e) 队列未满时,返回true;队列满则抛出IllegalStateException(“Queue full”)异常——AbstractQueue

* put(e) 队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

* offer(e) 队列未满时,返回true;队列满时返回false。非阻塞立即返回。

* offer(e, time, unit) 设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

*/

public boolean produce(AbstractMessageObj mail) throws InterruptedException {

return blockingQueue.offer(mail);

}

/**

* 消费出队

* poll() 获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null

* take() 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒

*/

public AbstractMessageObj consume() throws InterruptedException {

return blockingQueue.take();

}

// 获取队列大小

public int size() {

return blockingQueue.size();

}

}

b.异常消息处理,基于多线程

public  abstract class AbstractConsumeMessageQueue {

private static final Logger logger = LoggerFactory.getLogger(AbstractConsumeMessageQueue.class);

private ExecutorService executorService = Executors.newFixedThreadPool(2,new ThreadFactory() {

int count=1;

@Override

public Thread newThread(Runnable r) {

return new Thread(r, "consume_message_pool_" + (count++));

}

});// 两个大小的固定线程池

/**

* 抽象类实现消息处理逻辑

* @param messageObj

*/

protected abstract void handleMessage(AbstractMessageObj messageObj);

@PostConstruct

public void init() {

executorService.submit(new HandleMessageThread());

executorService.submit(new HandleMessageThread());

}

class HandleMessageThread implements Runnable {

@Override

public void run() {

while (true) {

try {

AbstractMessageObj obj = MessageQueue.getMessageQueue().consume();

if (obj != null) {

logger.info("剩余消息总数:{}", MessageQueue.getMessageQueue().size());

handleMessage(obj);

}

} catch (Exception e) {

logger.error("处理消息出错",e);

}

}

}

}

@PreDestroy

public void destory() {

logger.info("destroy");

executorService.shutdown();

}

}

 

以上是 常用简单消费队列实现 的全部内容, 来源链接: utcz.com/z/517186.html

回到顶部