RabbitMQ入门

编程

本章主要介绍RabbitMQ 的基本概念

rabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

Producer: 生产者,就是投递消息的一方。

消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。

标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者(Consumer)

Consumer: 消费者,就是接收消息的一方。

消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。

Broker: 消息中间件的服务节点。

对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台服务器。

生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程:

首先生产者将业务方数据进行可能的包装,之后封装成消息,发送 (AMQP 协议里这个动作对应的命令为 Basic.Publish) 到 Broker 中。消费者订阅并接收消息(AMQP 协议里这个动作对应的命令为 Basic.Consumer 或者 Basic.Get),经过可能的解包处理得到原始的数据,之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。消费者进程可以使用一个线程去接收消息,存入到内存中,比如使用JAVA 中的BlockingQueue。业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解耦,提高整个应用的处理效率。

队列:

Queue: 队列,是 RabbitMQ 的内部对象,用于存储消息。

RabbitMQ 中消息都只能存储在队列中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题)这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。

多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。

交换器、路由键、绑定:

Exchange: 交换器。

生产者将消息发送到 Exchange(交换器,通常也可以用大写的"X"来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。

RabbitMQ 中的交换器有四种类型,不同的类型有着不同的路由策略。(Exchange Type)

RoutingKey: 路由键。生产者将消息发给交换器的时候,一般会指定一个 RoutingKey, 用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。

在交换器类型和绑定键(BindingKey)固定的情况下,生产者可以在发送消息给交换器时,通过指定 RoutingKey 来决定消息流向哪里。

Binding: 绑定。RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey), 这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

生产者将消息发送给交换器时,需要一个 RoutingKey, 当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应队列中。在绑定多个队列到同一个交换器时,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如 fanout 类型的交换器就会无视 BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

交换器类型:

RabbitMQ 常用的交换器类型有 fanout、direct、topic、headers 这四种。AMQP 协议里还提到另外两种类型:System和自定义,这里不予描述。

fanout (展开、分列)

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

direct(管理、直接的)

direct 类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey 和 RoutingKey 完全匹配的队列中。

topic(主题)

topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

  • RoutingKey 为一个点号 "." 分隔的字符串(被点号 "." 分隔开的每一段独立的字符串称为一个单词),如 ”com.rabbitmq.client“、"java.util.concurrent"、"com.hidden.client";
  • BindingKey 和 RoutingKey 一样也是点号 "." 分隔的字符串;
  • BindingKey 中可以存在两种特殊字符串 "*" 和 "#",用于做模糊匹配,其中 ”*“ 用于匹配一个单词,"#" 用于匹配多规格单词(可以是零个)。

headers

headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。

RabbitMQ 的运转

我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条TCP 连接,也就是 Connection。一旦TCP连接建立起来,客户端紧接着可以创建一个 AMQP信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个TCP连接。然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。RabbitMQ采用类似 NIO (Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理

每个线程把持一个信道,所以信道复用了 Connection 的TCP连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略需要根据业务自身的实际情况进行调节。

AMQP 协议介绍

RabbitMQ 就是 AMQP 协议的 Erlang 的实现。AMQP 的模型架构和 RabbitMQ 的模型架构是一样的。

AMQP 协议本身包括三层:

  • Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用Basic.Consumer 订阅一个队列中的消息。
  • Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  • Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互

AMQP 生产者流转过程:

Connection connection = factory.newConnection();//创建连接

Channel channel = connection.createChannel();//创建信道

String message = "hello world!";

channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

//关闭资源

channel.close();

connection.close();

 

 

 

 

 

 

 

 

 

 

以上是 RabbitMQ入门 的全部内容, 来源链接: utcz.com/z/512079.html

回到顶部