RabbitMQ安装、基本特性API使用教程

一、初识RabbitMQ

是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

AMQP协议Advanced Message Queuing Protocol(高级消息队列协议)

 定义:具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,

是应用层协议的一个开放标准,为面向消息中间件设计。

RabbitMQ简介、安装、基本特性API--Java测试

AMQP专业术语:

  • Server:又称broker,接受客户端的链接,实现AMQP实体服务
  • Connection:连接,应用程序与broker的网络连接
  • Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
  • virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
  • Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
  • Binding:  Exchange和Queue之间的虚拟链接,binding中可以包换routing key
  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)

RabbitMQ整体架构

RabbitMQ简介、安装、基本特性API--Java测试

Exchange和队列是多对多关系,实际操作一般为1个exchange对多个队列,为避免设计过于复杂.

二、单机版快速安装

  • 1、首先在Linux上进行一些软件的准备工作

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

  • 2、下载安装必要的软件
  • 备用地址:

  • https://www.rabbitmq.com/install-rpm.html#downloads
  • https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.4/erlang-23.0.4-1.el7.x86_64.rpm

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm

  • 3、安装服务命令

rpm -ivh erlang-23.0.4-1.el7.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm​

  • 4、启动

启动服务

systemctl start rabbitmq-server

查看是否启动

lsof -i:5672

  • 5、启动、安装web管理插件(管控台)

rabbitmq-plugins enable rabbitmq_management

  • 6、查看管理端口有没有启动

lsof -i:15672

或者:

netstat -tnlp | grep 15672

  • 7、添加用户

#添加用户 用户名 admin 密码 admin web管理工具可用此用户登录

sudo rabbitmqctl add_user admin admin

#设置用户角色 管理员

sudo rabbitmqctl set_user_tags admin administrator

#设置用户权限(接受来自所有Host的所有操作)

sudo rabbitmqctl set_permissions -p / admin "." "." ".*"

#查看用户权限

sudo rabbitmqctl list_user_permissions admin

  • 重新启动

systemctl start rabbitmq-server

rabbitmq-plugins enable rabbitmq_management

  • 访问:http://192.168.2.121:15672/ 使用 admin 登录

RabbitMQ简介、安装、基本特性API--Java测试

  • 代码测试

  1. 引入依赖

        <dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp-client</artifactId>

<version>3.6.5</version>

</dependency>

2.发送端:

package com.zhouhong.rabbitmq.api.helloworld;

import java.util.HashMap;

import java.util.Map;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Sender {

public static void main(String[] args) throws Exception {

// 1 创建ConnectionFactory

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setUsername("admin");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("/");

// 2 创建Connection

Connection connection = connectionFactory.newConnection();

// 3 创建Channel

Channel channel = connection.createChannel();

// 4 声明

String queueName = "test001";

// 参数: queue名字,是否持久化,独占的queue(仅供此连接),不使用时是否自动删除, 其他参数

channel.queueDeclare(queueName, false, false, false, null);

Map<String, Object> headers = new HashMap<String, Object>();

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()

.deliveryMode(2)

.contentEncoding("UTF-8")

.headers(headers).build();

for(int i = 0; i < 5;i++) {

String msg = "Hello World RabbitMQ " + i;

channel.basicPublish("", queueName , props , msg.getBytes());

}

}

}

3.接收端

package com.zhouhong.rabbitmq.api.helloworld;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Receiver {

public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setPassword("admin");

connectionFactory.setUsername("admin");

connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);

connectionFactory.setNetworkRecoveryInterval(3000);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

String queueName = "test001";

// durable 是否持久化消息

channel.queueDeclare(queueName, false, false, false, null);

QueueingConsumer consumer = new QueueingConsumer(channel);

// 参数:队列名称、是否自动ACK、Consumer

channel.basicConsume(queueName, true, consumer);

// 循环获取消息

while(true){

// 获取消息,如果没有消息,这一步将会一直阻塞

Delivery delivery = consumer.nextDelivery();

String msg = new String(delivery.getBody());

System.out.println("收到消息:" + msg);

}

}

}

4.结果(先启动接收端进行监控,再启动发送端)

收到消息:Hello World RabbitMQ 0

收到消息:Hello World RabbitMQ 1

收到消息:Hello World RabbitMQ 2

收到消息:Hello World RabbitMQ 3

收到消息:Hello World RabbitMQ 4

三、RabbitMQ----交换机

RabbitMQ简介、安装、基本特性API--Java测试

  1. Name:交换机名称。
  2. Type:交换机类型 direct、topic、fanout、headers。
  3. Durability:是否持久化,ture为持久化。
  4. Auto Delete :当最后一个绑定道Exchange上的队列删除后,自动删除该Exchange。
  5. Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False。
  6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用。
  7. DirectExchange的消息被转发道RouteKey中指定的Queue。

交换机-----Direct exchange

Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

代码:

  • 发送端

package com.zhouhong.rabbitmq.api.exchange.direct;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Sender4DirectExchange {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setPassword("admin");

connectionFactory.setUsername("admin");

connectionFactory.setVirtualHost("/");

//2 创建Connection

Connection connection = connectionFactory.newConnection();

//3 创建Channel

Channel channel = connection.createChannel();

//4 声明

String exchangeName = "test_direct_exchange";

//必须要和接收端 routingKey 一一对应

String routingKey = "test_direct_routingKey";

//5 发送

String msg = "Hello World RabbitMQ 4 Direct Exchange Message ... ";

channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());

}

}

  •  接收端

package com.zhouhong.rabbitmq.api.exchange.direct;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Receiver4DirectExchange {

public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setPassword("admin");

connectionFactory.setUsername("admin");

connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);

connectionFactory.setNetworkRecoveryInterval(3000);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

//4 声明

String exchangeName = "test_direct_exchange";

String exchangeType = "direct";

String queueName = "test_direct_queue";

String routingKey = "test_direct_routingKey";

channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息

QueueingConsumer consumer = new QueueingConsumer(channel);

//参数:队列名称、是否自动ACK、Consumer

channel.basicConsume(queueName, true, consumer);

//循环获取消息

while(true){

//获取消息,如果没有消息,这一步将会一直阻塞

Delivery delivery = consumer.nextDelivery();

String msg = new String(delivery.getBody());

System.out.println("收到消息:" + msg);

}

}

}

交换机-----topic exchange

exchange 将Routekey和某个topic进行一个模糊匹配,发送给对应队列、可以用通配符进行匹配

RabbitMQ简介、安装、基本特性API--Java测试

比如下面例子

RabbitMQ简介、安装、基本特性API--Java测试

代码:

  • 接收端

package com.zhouhong.rabbitmq.api.exchange.topic;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Receiver4TopicExchange1 {

public static void main(String[] args) throws Exception {

ConnectionFactory connectionFactory = new ConnectionFactory() ;

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setPassword("admin");

connectionFactory.setUsername("admin");

connectionFactory.setVirtualHost("/");

connectionFactory.setAutomaticRecoveryEnabled(true);

connectionFactory.setNetworkRecoveryInterval(3000);

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

//4 声明

String exchangeName = "test_topic_exchange";

String exchangeType = "topic";

String queueName = "test_topic_queue";

// 只能匹配一个 例如:user.txt、user.py都可以,但是user.txt.py 不行

//String routingKey = "user.*";

// user.txt、user.py 、user.txt.py 都可以匹配到

String routingKey = "user.#";

channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

channel.queueDeclare(queueName, false, false, false, null);

channel.queueBind(queueName, exchangeName, routingKey);

//durable 是否持久化消息

QueueingConsumer consumer = new QueueingConsumer(channel);

// 参数:队列名称、是否自动ACK、Consumer

channel.basicConsume(queueName, true, consumer);

System.err.println("consumer1 start.. ");

// 循环获取消息

while(true){

// 获取消息,如果没有消息,这一步将会一直阻塞

Delivery delivery = consumer.nextDelivery();

String msg = new String(delivery.getBody());

System.out.println("收到消息:" + msg + ", RoutingKey: " + delivery.getEnvelope().getRoutingKey());

}

}

}

  • 发送端

package com.zhouhong.rabbitmq.api.exchange.topic;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class Sender4TopicExchange {

public static void main(String[] args) throws Exception {

//1 创建ConnectionFactory

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("192.168.2.121");

connectionFactory.setPort(5672);

connectionFactory.setPassword("admin");

connectionFactory.setUsername("admin");

connectionFactory.setVirtualHost("/");

//2 创建Connection

Connection connection = connectionFactory.newConnection();

//3 创建Channel

Channel channel = connection.createChannel();

//4 声明

String exchangeName = "test_topic_exchange";

String routingKey1 = "user.save";

String routingKey2 = "user.update";

String routingKey3 = "user.delete.abc";

//5 发送

String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";

channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());

channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());

channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());

channel.close();

connection.close();

}

}

交换机-----Fanout exchange 广播模式

1.不处理路由键,只需要简单的将队列绑定到交换机上。

2.发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。

3.Fanout交换机转发消息是最快的。

RabbitMQ简介、安装、基本特性API--Java测试

代码:见示例文章开始GitHub地址

四、RabbitMQ高级特性

1、消息如何保障 100% 的投递成功

生产端的可靠性投递的标志:

1、消息成功发出

2、mq节点成功接收

3、发送端MQ节点确认应答

4、完善的消息补偿机制

解决:消息信息落库,对消息状态进行打标

RabbitMQ简介、安装、基本特性API--Java测试

幂等性

    1、 select count(1) from t_order where id = 唯一id(或)指纹码

    2、唯一id或指纹码机制,利用数据库主键去重

2、Confirm

RabbitMQ简介、安装、基本特性API--Java测试

第一步:再channel上开启确认模式:channel.confirmSelect();

第二步:再channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日期等后续处理!

3、return消息机制

ReturnListener用于处理不可路由的消息

RabbitMQ简介、安装、基本特性API--Java测试

我们的消息生产者,通过指定一个Exchage和Routingkey,把消息送达某一个队列中去,然后我们的消费者监听队列,进行消费处理操作,如果没有合适的队列,则会由returnListener进行接受。

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

4、消费端ACK与重回队列

消费端ACK:

  • 在工作的时候一般不会选择自动ack
  • 消费端的手工ack分为两种ACK和NACK
  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。这种建议回复NACK,不要重回队列
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列

  • 是为了对没有处理成功的消息,把消息重新会投递给broker。
  • 重回队列,会回到队列的尾部
  • 也会造成一条消息一直重复投递,死循环了
  • 在实际应用中,都会关闭重回队列,也就是设置为false

5、TTL队列和消息

TTL: time to live的缩写,也就是生存时间。

  • RabbitMQ 支持消息过期时间,在消息发送时可以进行指定
  • RabbitMQ支持队列过期时间,从消息入队列开始计算,只要超过了队列的超时间时间配置,那么消息会自动的清除

死队列: DLX,Dead-Letter-Exchange

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX.

消息变成死信的几种情况

  • 消息被拒绝 并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

DLX也是一个正常的Exchange,实际上是一个属性控制

  • 当队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上,进而被路由到另一个队列.
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitMQ3.0以前的immediate参数功能。
  • 在正常队列上添加参数:arguments.put("x-dead-letter-exchange","dlx.exchange");这样消息过期、requeue、队列达到最大长度时,就可以直接路由到死信队列。

以上是 RabbitMQ安装、基本特性API使用教程 的全部内容, 来源链接: utcz.com/a/120037.html

回到顶部