【Java】Java-基于LinkedList手写一个消息队列
Java-基于LinkedList手写一个消息队列
善良的小黑哥发布于 今天 07:43
前言
日常开发中,我们最常用的集合主要有两个,一个是ArrayList,一个是LinkedList
如果对ArrayList还有不明白的同学,可以看一下我之前写的一篇文章:Java集合,ArrayList源码深入解析
那么本篇文章,我们主要是基于LinkedList写一个简单的队列。
设计思路:
首先我们想要设计一个方案的时候,要先捋清楚思路,想一下现有的,别人已经实现的方案,然后思考自己如何才能实现。(比如rabbitMq)
队列管理中心:集中管理所有创建的队列
提供方:往消息队列中发送消息
消费方:监听消息队列中的消息,并进行消费(如果监听到队列中新放入了消息,则自动消费处理)
第一步:实现消息队列
package com.dm.black.modules.myQuere;import java.util.LinkedList;
/**
* 基于LinkedList实现消息队列
* @author wjy
* @date 2021/1/20
*/
public class MQueue extends QueueCenter {
private LinkedList<Object> queue = new LinkedList<>();
/**
* 注意:这里加锁是为了防止并发操作,因为LinkedList本身是线程不安全的
* @method 放入消息
* @param o
* @return
*/
public boolean putMessage(Object o) {
synchronized (queue) {
// 如果队列在等待,则执行唤醒
if (queue.isEmpty()) {
System.out.println("唤醒队列...");
queue.notifyAll();
}
// 将消息放入队列
queue.push(o);
return true;
}
}
/**
* @method 获得消息(获取首条消息并删除)
* @return
*/
public Object pollFirst() {
synchronized (queue) {
// 如果队列中没有消息,则处于堵塞状态,有消息则进行消费
if (queue.isEmpty()) {
try {
System.out.println("队列中没有数据,开始等待....");
queue.wait();
// 被唤醒后,继续往下执行
Object o = queue.pollFirst();
return o;
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
Object o = queue.pollFirst();
return o;
}
}
return null;
}
/**
* 获得消息(获取首条消息但不删除)
* @return
*/
public Object getFrist(){
synchronized (queue) {
Object first = queue.getFirst();
return first;
}
}
/**
* 队列中是否存在消息
* @return
*/
public boolean isReady() {
if (!queue.isEmpty()) {
return true;
}
return false;
}
}
第二步:实现消息队列管理中心
package com.dm.black.modules.myQuere;import java.util.HashMap;
import java.util.Map;
/**
* Queue-center
* @author wjy
* @date 2021/1/20
*/
public class QueueCenter {
/**
* @description 这里使用Map 作为队列管理中心
* 创建一个queue管理中心,所有创建的Queue在这里进行管理
* Map -> key : queue名称
* Map -> value : 队列
*/
private static Map<String, MQueue> queueCenter = new HashMap<>();
/**
* @method 从 Queue-center 获取 Queue
* 加锁目的:防止同时创建相同名称的queue
*/
public static MQueue getQueue(String queueName) {
synchronized (queueName) {
// 从map中根据名称获取队列,如果已经存在,则返回map中的队列
MQueue queue = queueCenter.get(queueName);
// 如果是第一次创建队列,则新建队列并放入map,然后将新建的队列返回
if (queue == null) {
queue = new MQueue();
putQueue(queueName, queue);
MQueue mQueue = queueCenter.get(queueName);
return mQueue;
}
return queue;
}
}
/**
* @method 将 Queue 放入 Queue-center
* @param queueName
*/
private static void putQueue(String queueName, MQueue queue) {
queueCenter.put(queueName, queue);
}
}
第三步:消息注册
package com.dm.black.modules.myQuere;/**
* 注册者
* @author wjy
* @date 2021/1/20
*/
public class MProvider {
// 队列名称
private final String queueName = "demo";
/**
* 发送消息到'demo'队列
* @param message
*/
public void sendMessage(String message) {
// 获取到queue
MQueue queue = QueueCenter.getQueue(queueName);
// 放入消息
queue.putMessage(message);
System.out.println("提供者:" + queueName + ": 发送消息:" + message);
}
}
第四步:消息者实现
package com.dm.black.modules.myQuere;import java.util.LinkedList;
/**
* 消费者
* @author wjy
* @date 2021/1/20
*/
public class MConsumer {
private final String queueName = "demo";
/**
* 接收消息并删除
*/
public void receiveMessageAndDelete() {
MQueue queue = QueueCenter.getQueue(queueName);
// 如果队列中存在消息,则一直处于消费状态
while (true) {
// 消费消息, 执行巴拉巴拉一大堆业务处理后删除
Object o = queue.pollFirst();
System.out.println("消费者:" + queueName + ": 接收到消息:" + o.toString());
}
}
}
package com.dm.black;import com.dm.black.modules.myQuere.MConsumer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.dm.black.modules.*.mapper")
@SpringBootApplication
public class BlackApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(BlackApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
// 启动消费者
MConsumer mConsumer = new MConsumer();
mConsumer.receiveMessageAndDelete();
}
}
测试
package com.dm.black.modules.myQuere;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wjy
* @date 2021/1/22
*/
@RestController
@RequestMapping("/mQueue")
public class MQueueController {
@GetMapping("/sendMessage")
public String sendMessage(String message) {
MProvider mProvider = new MProvider();
mProvider.sendMessage(message);
return "success";
}
}
我们看到,当项目启动的时候,消费者已经处于就绪状态,队列中没有消息,所以处于堵塞状态,当监听到消息后,立马工作进行消费。
我们调用一下
看一下控制台输出
可以看到,提供者第一次注册消息时,将队列唤醒,并注册到队列中,消费者监听到消息,立马开始工作。
到这里我们已经实现了一个简易版的消息队列,如果对大家有帮助,希望多多支持。
因为没有新起项目去做这个Demo,所以就不提供源码了,大家自行copy我贴出来的代码吧
javaspring后端集合linkedlist
阅读 31发布于 今天 07:43
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
善良的小黑哥
不要因为没有掌声而放弃努力!
1 声望
0 粉丝
善良的小黑哥
不要因为没有掌声而放弃努力!
1 声望
0 粉丝
宣传栏
前言
日常开发中,我们最常用的集合主要有两个,一个是ArrayList,一个是LinkedList
如果对ArrayList还有不明白的同学,可以看一下我之前写的一篇文章:Java集合,ArrayList源码深入解析
那么本篇文章,我们主要是基于LinkedList写一个简单的队列。
设计思路:
首先我们想要设计一个方案的时候,要先捋清楚思路,想一下现有的,别人已经实现的方案,然后思考自己如何才能实现。(比如rabbitMq)
队列管理中心:集中管理所有创建的队列
提供方:往消息队列中发送消息
消费方:监听消息队列中的消息,并进行消费(如果监听到队列中新放入了消息,则自动消费处理)
第一步:实现消息队列
package com.dm.black.modules.myQuere;import java.util.LinkedList;
/**
* 基于LinkedList实现消息队列
* @author wjy
* @date 2021/1/20
*/
public class MQueue extends QueueCenter {
private LinkedList<Object> queue = new LinkedList<>();
/**
* 注意:这里加锁是为了防止并发操作,因为LinkedList本身是线程不安全的
* @method 放入消息
* @param o
* @return
*/
public boolean putMessage(Object o) {
synchronized (queue) {
// 如果队列在等待,则执行唤醒
if (queue.isEmpty()) {
System.out.println("唤醒队列...");
queue.notifyAll();
}
// 将消息放入队列
queue.push(o);
return true;
}
}
/**
* @method 获得消息(获取首条消息并删除)
* @return
*/
public Object pollFirst() {
synchronized (queue) {
// 如果队列中没有消息,则处于堵塞状态,有消息则进行消费
if (queue.isEmpty()) {
try {
System.out.println("队列中没有数据,开始等待....");
queue.wait();
// 被唤醒后,继续往下执行
Object o = queue.pollFirst();
return o;
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
Object o = queue.pollFirst();
return o;
}
}
return null;
}
/**
* 获得消息(获取首条消息但不删除)
* @return
*/
public Object getFrist(){
synchronized (queue) {
Object first = queue.getFirst();
return first;
}
}
/**
* 队列中是否存在消息
* @return
*/
public boolean isReady() {
if (!queue.isEmpty()) {
return true;
}
return false;
}
}
第二步:实现消息队列管理中心
package com.dm.black.modules.myQuere;import java.util.HashMap;
import java.util.Map;
/**
* Queue-center
* @author wjy
* @date 2021/1/20
*/
public class QueueCenter {
/**
* @description 这里使用Map 作为队列管理中心
* 创建一个queue管理中心,所有创建的Queue在这里进行管理
* Map -> key : queue名称
* Map -> value : 队列
*/
private static Map<String, MQueue> queueCenter = new HashMap<>();
/**
* @method 从 Queue-center 获取 Queue
* 加锁目的:防止同时创建相同名称的queue
*/
public static MQueue getQueue(String queueName) {
synchronized (queueName) {
// 从map中根据名称获取队列,如果已经存在,则返回map中的队列
MQueue queue = queueCenter.get(queueName);
// 如果是第一次创建队列,则新建队列并放入map,然后将新建的队列返回
if (queue == null) {
queue = new MQueue();
putQueue(queueName, queue);
MQueue mQueue = queueCenter.get(queueName);
return mQueue;
}
return queue;
}
}
/**
* @method 将 Queue 放入 Queue-center
* @param queueName
*/
private static void putQueue(String queueName, MQueue queue) {
queueCenter.put(queueName, queue);
}
}
第三步:消息注册
package com.dm.black.modules.myQuere;/**
* 注册者
* @author wjy
* @date 2021/1/20
*/
public class MProvider {
// 队列名称
private final String queueName = "demo";
/**
* 发送消息到'demo'队列
* @param message
*/
public void sendMessage(String message) {
// 获取到queue
MQueue queue = QueueCenter.getQueue(queueName);
// 放入消息
queue.putMessage(message);
System.out.println("提供者:" + queueName + ": 发送消息:" + message);
}
}
第四步:消息者实现
package com.dm.black.modules.myQuere;import java.util.LinkedList;
/**
* 消费者
* @author wjy
* @date 2021/1/20
*/
public class MConsumer {
private final String queueName = "demo";
/**
* 接收消息并删除
*/
public void receiveMessageAndDelete() {
MQueue queue = QueueCenter.getQueue(queueName);
// 如果队列中存在消息,则一直处于消费状态
while (true) {
// 消费消息, 执行巴拉巴拉一大堆业务处理后删除
Object o = queue.pollFirst();
System.out.println("消费者:" + queueName + ": 接收到消息:" + o.toString());
}
}
}
package com.dm.black;import com.dm.black.modules.myQuere.MConsumer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan("com.dm.black.modules.*.mapper")
@SpringBootApplication
public class BlackApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(BlackApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
// 启动消费者
MConsumer mConsumer = new MConsumer();
mConsumer.receiveMessageAndDelete();
}
}
测试
package com.dm.black.modules.myQuere;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author wjy
* @date 2021/1/22
*/
@RestController
@RequestMapping("/mQueue")
public class MQueueController {
@GetMapping("/sendMessage")
public String sendMessage(String message) {
MProvider mProvider = new MProvider();
mProvider.sendMessage(message);
return "success";
}
}
我们看到,当项目启动的时候,消费者已经处于就绪状态,队列中没有消息,所以处于堵塞状态,当监听到消息后,立马工作进行消费。
我们调用一下
看一下控制台输出
可以看到,提供者第一次注册消息时,将队列唤醒,并注册到队列中,消费者监听到消息,立马开始工作。
到这里我们已经实现了一个简易版的消息队列,如果对大家有帮助,希望多多支持。
因为没有新起项目去做这个Demo,所以就不提供源码了,大家自行copy我贴出来的代码吧
以上是 【Java】Java-基于LinkedList手写一个消息队列 的全部内容, 来源链接: utcz.com/a/106612.html
得票时间