记一次工作中使用延迟队列的场景
首先创建一个延迟队列管理类,采用spring 托管
package com.juyi.camera.config;import com.alibaba.fastjson.JSONObject;
import com.juyi.camera.utils.task.DelayTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/3/31
* Time: 9:18
*/
@Component
public class DelayQueueManager implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);
private DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
@Resource
private ExecutorService consumerExecutor;
/**
* 加入到延时队列中
*
* @param task
*/
public void put(DelayTask task) {
delayQueue.put(task);
}
@Override
public void run(String... args) {
consumerExecutor.execute(new Thread(this::excuteThread));
}
/**
* 延时任务执行线程
*/
private void excuteThread() {
while (true) {
try {
DelayTask task = delayQueue.take();
processTask(task);
} catch (InterruptedException e) {
break;
}
}
}
/**
* 内部执行延时任务
*
* @param task
*/
private void processTask(DelayTask task) {
JSONObject cardInfo = task.getData().getPayload();
String msisdn = cardInfo.getString("msisdn");
String iccid = cardInfo.getString("iccid");
logger.info("msisdn:{},iccid:{}", msisdn, iccid);
}
}
excuteThread 方法里面 delayQueue 阻塞执行,直到有数据
package com.juyi.camera.utils.task;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/3/31
* Time: 9:17
* 延时任务
*/
public class DelayTask implements Delayed {
final private TaskBase data;
final private long expire;
/**
* 构造延时任务
*
* @param data 业务数据
* @param expire 任务延时时间(ms)
*/
public DelayTask(TaskBase data, long expire) {
super();
this.data = data;
this.expire = expire + System.currentTimeMillis();
}
public TaskBase getData() {
return data;
}
public long getExpire() {
return expire;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DelayTask) {
String msgId = ((DelayTask) obj).getData().getMsgId();
return this.data.getMsgId().equals(msgId);
}
return false;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), unit);
}
@Override
public int compareTo(Delayed o) {
long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (int) delta;
}
}
DelayTask 为自定义的队列元素 必须实现 Delayed
package com.juyi.camera.utils.task;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/3/31
* Time: 9:17
*/
public class TaskBase {
private String msgId;
private JSONObject payload;
public TaskBase(String msgId) {
this.msgId = msgId;
}
public TaskBase(String msgId, JSONObject payload) {
this.msgId = msgId;
this.payload = payload;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public JSONObject getPayload() {
return payload;
}
public void setPayload(JSONObject payload) {
this.payload = payload;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
TaskBase 是用户要处理的延时任务的数据基类,存放一些自定义数据,例如 设备号、卡号、ID、操作的用户等等
package com.juyi.camera.utils;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by IntelliJ IDEA.
* User: xuzhou
* Date: 2020/4/29
* Time: 18:54
*/
@Configuration
@Slf4j
public class ThreadPoolUtil implements DisposableBean {
/**
* 使用有界队列,避免OOM
*/
private static ExecutorService consumerExecutor;
/**
* 获取线程池实例
*
* @return 线程池对象
*/
@Bean
public ExecutorService getThreadPool() {
consumerExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());
log.info("ThreadPoolUtil 创建单例:{}", ((ThreadPoolExecutor) consumerExecutor).getActiveCount());
return consumerExecutor;
}
@Override
public void destroy() {
log.info("销毁 consumerExecutor");
consumerExecutor.shutdown();
}
}
ThreadPoolUtil 是spring 托管的线程池,在初始化延迟队列的时候使用了
2 单元测试效果如下,可以很好的满足需求
3 后记
这部分代码只满足了,流量卡复机,如果小于10分钟,执行失败,然后放置的延迟队列里面,10分钟之后再执行这个逻辑,仔细思考下,如果延迟队列里面的流量卡复机操作也失败了,是否还要加上N次重试的机制?N次之后发邮件告警等等。。。
以上是 记一次工作中使用延迟队列的场景 的全部内容, 来源链接: utcz.com/z/516217.html