RabbitMQ常用模式快速上手
注意配置
连接工厂基本配置
import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;
/**
* @author yezixun
* @date 2019/11/7 14:35
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
//默认账号为
factory.setVirtualHost("testhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工厂获取连接
return factory.newConnection();
}
}
Queue
发送
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 14:38
*/
public class Send {
//队列名称
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
//关闭通道和连接
channel.close();
connection.close();
}
}
接收
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 14:48
*/
public class Recv {
//队列名称
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received "" + message + """);
}
}
}
work
发送
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:26
*/
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送0-99的数字
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
//线程沉睡 数字越大睡得越久
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
接收
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:25
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
// true 自动确认:无论是否成功 都认为成功消费
// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received "" + message + """);
// 休眠1秒 模拟数据量 接收间隔比Recv1长 模拟性能(压力)弱的服务器
Thread.sleep(1000);
//下面这行注释掉表示使用自动确认模式 开启表示手动
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:23
*/
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
// true 自动确认:无论是否成功 都认为成功消费
// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [y] Received "" + message + """);
//休眠 模拟数据量 接收间隔比Recv2短 模拟性能(压力)强的服务器
Thread.sleep(10);
// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消息的确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
发布订阅模式
发送
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* 发布订阅模式
* @author yezixun
* @date 2019/11/7 15:54
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
channel.close();
connection.close();
}
}
接收
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:55
*/
public class Recv {
private final static String QUEUE_NAME = "test_queue_work1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received "" + message + """);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:55
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2] Received "" + message + """);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
路由模式
发送
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* 路由模式
* @author yezixun
* @date 2019/11/7 16:26
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "添加";
channel.basicPublish(EXCHANGE_NAME, "select",null, message.getBytes());
//String message = "删除";
//channel.basicPublish(EXCHANGE_NAME, "delete",null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
channel.close();
connection.close();
}
}
接收
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:23
*/
public class Recv {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机 删除和修改
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
// true 自动确认:无论是否成功 都认为成功消费
// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [y] Received "" + message + """);
// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 15:23
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机 查询和增加
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
// true 自动确认:无论是否成功 都认为成功消费
// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [y] Received "" + message + """);
// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
主题模式
发送
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 16:45
*/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!!";
channel.basicPublish(EXCHANGE_NAME, "routekey.", null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
channel.close();
connection.close();
}
}
接收
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 16:46
*/
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_work_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv_x] Received "" + message + """);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.zhonglouguairen.util.ConnectionUtil;
/**
* @author yezixun
* @date 2019/11/7 16:46
*/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_work_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2_x] Received "" + message + """);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
————————————————
版权声明:本文中大部分为CSDN博主「niaobirdfly」的原创 详细信息参考原作者
原文链接:https://blog.csdn.net/hellozpc/article/details/81436980
以上是 RabbitMQ常用模式快速上手 的全部内容, 来源链接: utcz.com/z/510519.html