记一次工作中使用延迟队列的场景

编程

首先创建一个延迟队列管理类,采用spring 托管

package com.juyi.camera.config;

import com.alibaba.fastjson.JSONObject;

import com.juyi.camera.utils.task.DelayTask;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.boot.CommandLineRunner;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.util.concurrent.DelayQueue;

import java.util.concurrent.ExecutorService;

/**

* Created by IntelliJ IDEA.

* User: xuzhou

* Date: 2020/3/31

* Time: 9:18

*/

@Component

public class DelayQueueManager implements CommandLineRunner {

private final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);

private DelayQueue<DelayTask> delayQueue = new DelayQueue<>();

@Resource

private ExecutorService consumerExecutor;

/**

* 加入到延时队列中

*

* @param task

*/

public void put(DelayTask task) {

delayQueue.put(task);

}

@Override

public void run(String... args) {

consumerExecutor.execute(new Thread(this::excuteThread));

}

/**

* 延时任务执行线程

*/

private void excuteThread() {

while (true) {

try {

DelayTask task = delayQueue.take();

processTask(task);

} catch (InterruptedException e) {

break;

}

}

}

/**

* 内部执行延时任务

*

* @param task

*/

private void processTask(DelayTask task) {

JSONObject cardInfo = task.getData().getPayload();

String msisdn = cardInfo.getString("msisdn");

String iccid = cardInfo.getString("iccid");

logger.info("msisdn:{},iccid:{}", msisdn, iccid);

}

}

excuteThread 方法里面 delayQueue 阻塞执行,直到有数据

package com.juyi.camera.utils.task;

import java.util.concurrent.Delayed;

import java.util.concurrent.TimeUnit;

/**

* Created by IntelliJ IDEA.

* User: xuzhou

* Date: 2020/3/31

* Time: 9:17

* 延时任务

*/

public class DelayTask implements Delayed {

final private TaskBase data;

final private long expire;

/**

* 构造延时任务

*

* @param data 业务数据

* @param expire 任务延时时间(ms)

*/

public DelayTask(TaskBase data, long expire) {

super();

this.data = data;

this.expire = expire + System.currentTimeMillis();

}

public TaskBase getData() {

return data;

}

public long getExpire() {

return expire;

}

@Override

public boolean equals(Object obj) {

if (obj instanceof DelayTask) {

String msgId = ((DelayTask) obj).getData().getMsgId();

return this.data.getMsgId().equals(msgId);

}

return false;

}

@Override

public long getDelay(TimeUnit unit) {

return unit.convert(this.expire - System.currentTimeMillis(), unit);

}

@Override

public int compareTo(Delayed o) {

long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);

return (int) delta;

}

}

DelayTask 为自定义的队列元素 必须实现 Delayed

package com.juyi.camera.utils.task;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

/**

* Created by IntelliJ IDEA.

* User: xuzhou

* Date: 2020/3/31

* Time: 9:17

*/

public class TaskBase {

private String msgId;

private JSONObject payload;

public TaskBase(String msgId) {

this.msgId = msgId;

}

public TaskBase(String msgId, JSONObject payload) {

this.msgId = msgId;

this.payload = payload;

}

public String getMsgId() {

return msgId;

}

public void setMsgId(String msgId) {

this.msgId = msgId;

}

public JSONObject getPayload() {

return payload;

}

public void setPayload(JSONObject payload) {

this.payload = payload;

}

@Override

public String toString() {

return JSON.toJSONString(this);

}

}

TaskBase 是用户要处理的延时任务的数据基类,存放一些自定义数据,例如 设备号、卡号、ID、操作的用户等等

package com.juyi.camera.utils;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

/**

* Created by IntelliJ IDEA.

* User: xuzhou

* Date: 2020/4/29

* Time: 18:54

*/

@Configuration

@Slf4j

public class ThreadPoolUtil implements DisposableBean {

/**

* 使用有界队列,避免OOM

*/

private static ExecutorService consumerExecutor;

/**

* 获取线程池实例

*

* @return 线程池对象

*/

@Bean

public ExecutorService getThreadPool() {

consumerExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());

log.info("ThreadPoolUtil 创建单例:{}", ((ThreadPoolExecutor) consumerExecutor).getActiveCount());

return consumerExecutor;

}

@Override

public void destroy() {

log.info("销毁 consumerExecutor");

consumerExecutor.shutdown();

}

}

ThreadPoolUtil 是spring 托管的线程池,在初始化延迟队列的时候使用了

2 单元测试效果如下,可以很好的满足需求

3 后记

这部分代码只满足了,流量卡复机,如果小于10分钟,执行失败,然后放置的延迟队列里面,10分钟之后再执行这个逻辑,仔细思考下,如果延迟队列里面的流量卡复机操作也失败了,是否还要加上N次重试的机制?N次之后发邮件告警等等。。。

以上是 记一次工作中使用延迟队列的场景 的全部内容, 来源链接: utcz.com/z/516217.html

回到顶部