RabbitMQ常用模式快速上手

编程

注意配置

连接工厂基本配置

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

/**

* @author yezixun

* @date 2019/11/7 14:35

*/

public class ConnectionUtil {

public static Connection getConnection() throws Exception {

//定义连接工厂

ConnectionFactory factory = new ConnectionFactory();

//设置服务地址

factory.setHost("localhost");

//端口

factory.setPort(5672);

//设置账号信息,用户名、密码、vhost

//默认账号为

factory.setVirtualHost("testhost");

factory.setUsername("admin");

factory.setPassword("admin");

// 通过工厂获取连接

return factory.newConnection();

}

}

Queue

发送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 14:38

*/

public class Send {

//队列名称

private final static String QUEUE_NAME = "q_test_01";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

// 从连接中创建通道

Channel channel = connection.createChannel();

// 声明(创建)队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 消息内容

String message = "Hello World!";

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent "" + message + """);

//关闭通道和连接

channel.close();

connection.close();

}

}

接收

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 14:48

*/

public class Recv {

//队列名称

private final static String QUEUE_NAME = "q_test_01";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

// 从连接中创建通道

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

channel.basicConsume(QUEUE_NAME, true, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [x] Received "" + message + """);

}

}

}

work

发送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:26

*/

public class Send {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//发送0-99的数字

for (int i = 0; i < 100; i++) {

// 消息内容

String message = "" + i;

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

System.out.println(" [x] Sent "" + message + """);

//线程沉睡 数字越大睡得越久

Thread.sleep(i * 10);

}

channel.close();

connection.close();

}

}

接收

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:25

*/

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

// true 自动确认:无论是否成功 都认为成功消费

// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [x] Received "" + message + """);

// 休眠1秒 模拟数据量 接收间隔比Recv1长 模拟性能(压力)弱的服务器

Thread.sleep(1000);

//下面这行注释掉表示使用自动确认模式 开启表示手动

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:23

*/

public class Recv {

private final static String QUEUE_NAME = "test_queue_work";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

// true 自动确认:无论是否成功 都认为成功消费

// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [y] Received "" + message + """);

//休眠 模拟数据量 接收间隔比Recv2短 模拟性能(压力)强的服务器

Thread.sleep(10);

// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

模式2:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

发布订阅模式

发送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* 发布订阅模式

* @author yezixun

* @date 2019/11/7 15:54

*/

public class Send {

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明exchange

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 消息内容

String message = "Hello World!";

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

System.out.println(" [x] Sent "" + message + """);

channel.close();

connection.close();

}

}

接收

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:55

*/

public class Recv {

private final static String QUEUE_NAME = "test_queue_work1";

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [Recv] Received "" + message + """);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:55

*/

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_work2";

private final static String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [Recv2] Received "" + message + """);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

路由模式

发送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* 路由模式

* @author yezixun

* @date 2019/11/7 16:26

*/

public class Send {

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

// 消息内容

String message = "添加";

channel.basicPublish(EXCHANGE_NAME, "select",null, message.getBytes());

//String message = "删除";

//channel.basicPublish(EXCHANGE_NAME, "delete",null, message.getBytes());

System.out.println(" [x] Sent "" + message + """);

channel.close();

connection.close();

}

}

接收

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:23

*/

public class Recv {

private final static String QUEUE_NAME = "test_queue_direct_1";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机 删除和修改

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

//定义队列消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

// true 自动确认:无论是否成功 都认为成功消费

// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [y] Received "" + message + """);

// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 15:23

*/

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_direct_2";

private final static String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机 查询和增加

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

//定义队列消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列

// true 自动确认:无论是否成功 都认为成功消费

// false 手动确认:消费者成功消费后反馈,不反馈消息将一直处于不可用状态

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [y] Received "" + message + """);

// 返回确认状态,注释掉表示使用自动确认模式 开启表示手动

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

主题模式

发送

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 16:45

*/

public class Send {

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明exchange

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

// 消息内容

String message = "Hello World!!";

channel.basicPublish(EXCHANGE_NAME, "routekey.", null, message.getBytes());

System.out.println(" [x] Sent "" + message + """);

channel.close();

connection.close();

}

}

接收

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 16:46

*/

public class Recv {

private final static String QUEUE_NAME = "test_queue_topic_work_1";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [Recv_x] Received "" + message + """);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import com.zhonglouguairen.util.ConnectionUtil;

/**

* @author yezixun

* @date 2019/11/7 16:46

*/

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_topic_work_2";

private final static String EXCHANGE_NAME = "test_exchange_topic";

public static void main(String[] argv) throws Exception {

// 获取到连接以及mq通道

Connection connection = ConnectionUtil.getConnection();

Channel channel = connection.createChannel();

// 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 绑定队列到交换机

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

// 同一时刻服务器只会发一条消息给消费者

channel.basicQos(1);

// 定义队列的消费者

QueueingConsumer consumer = new QueueingConsumer(channel);

// 监听队列,手动返回完成

channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(" [Recv2_x] Received "" + message + """);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

————————————————

版权声明:本文中大部分为CSDN博主「niaobirdfly」的原创 详细信息参考原作者

原文链接:https://blog.csdn.net/hellozpc/article/details/81436980

以上是 RabbitMQ常用模式快速上手 的全部内容, 来源链接: utcz.com/z/510519.html

回到顶部