消息队列实战(一) 基础

一、消息模型:主题和队列

RabbitMQ 的消息模型

图片名称

RabbitMQ是少数依然坚持使用队列模型的产品之一,在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中

RocketMQ 的消息模型

在 RocketMQ 中既有发布订阅,也有队列

几乎所有的消息队列产品都使用一种非常朴素的请求 - 确认机制,确保消息不会在传递过程中由于网络或服务器故障丢失。

在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;

在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。

但是,为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。 也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。

图片名称

RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响。也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

例如有一个topic中有5个队列,分布在两个broker上:
图片名称
生产者不用对应队列,可以轮询发送,也可以随机发
由于消费确认机制的限制,在同一个消费组里面,每个队列只能被一个消费者实例占用。
每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在服务端保存,并且,消费位置和消费者是没有关系的。

Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是完全一样的。唯一的区别是,在 Kafka 中,队列叫分区(Partition)。

二、利用事务消息实现分布式事务

消息队列中的“事务”,主要解决的是消息生产者和消息消费者的数据一致性问题。

例如创建订单后删除购物车商品

图片名称

创建了订单,没有清理购物车;订单没创建成功,购物车里面的商品却被清掉了。在上述任意步骤都有可能失败的情况下,还要保证订单库和购物车库这两个库的数据一致性。

对于购物车系统收到订单创建成功消息清理购物车这个操作来说,只要成功执行购物车清理后再提交消费确认即可,如果失败,由于没有提交消费确认,消息队列会自动重试。

问题的关键点集中在订单系统,创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败, 不允许一个成功而另一个失败的情况出现。

消息队列是如何实现分布式事务的?

在实际应用中,比较常见的分布式事务实现有 2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel) 和事务消息。
事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。

图片名称

半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
如果在第四步提交事务消息时失败了怎么办? 对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。

RocketMQ 中的分布式事务实现

在 RocketMQ 中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。

如果 Producer 在提交或者回滚事务消息时发生网络异常,RocketMQ的 Broker 没有收到提交或者回滚的请求,Broker 会定期去 Producer 上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。

为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ 本地事务是成功还是失败。 反查本地事务的逻辑也很简单,我们只要根据消息中的订单 ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

这个反查本地事务的实现,并不依赖消息的发送方, 也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ 依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。

三、如何确保消息不会丢失?

检测消息丢失的方法

我们可以利用消息队列的有序性来验证是否有消息丢失。 原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

如果你的系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识, 在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

确保消息可靠传递

图片名称

生产阶段: 从消息在 Producer 创建出来,经过网络传输发送到 Broker端。

存储阶段: 消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

消费阶段: Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer 上。

生产阶段:

在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

存储阶段:

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给Producer 返回确认响应, 这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。

如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应。 这样当某个 Broker 宕机时,其他的Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段:

不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

四、如何处理消费过程中的重复消息?

At most once: 至多一次。消息在传递时,最多会被送达一次。没什么消息可靠性保证,允许丢消息。 一般都是一些对消息可靠性要求不太高的监控场景使用,比如每分钟上报一次机房温度数据,可以接受数据少量丢失。

At least once: 至少一次。消息在传递时,至少会被送达一次。不允许丢消息,但是允许有少量重复消息出现。

Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

用幂等性解决重复消息问题

其任意多次执行所产生的影响均与一次执行的影响相同
At least once + 幂等消费 = Exactly once
实现幂等最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。

  1. 利用数据库的唯一约束实现幂等(唯一索引)

    不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOTEXIST”语义的存储类系统都可以用于实现幂等,比如,可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

  2. 为更新的数据设置前置条件(数据库乐观锁)

    给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。

    更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

  3. 记录并检查操作
    也称为Token 机制或者 GUID(全局唯一 ID)机制。执行数据更新操作之前,先检查一下是否执行过这个更新操作。

    在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。

    问题:

    1.给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿, 方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。

    2.在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等, 否则就会出现 Bug。 对于这个问题,当然我们可以用事务来实现,也可以用锁来实现。

五、消息积压了该如何处理?

优化性能来避免消息积压

  1. 发送端性能优化

    如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的

  2. 消费端性能优化

    一定要保证消费端的消费性能要高于生产端的发送性能, 这样的系统才能健康的持续运行。
    最常用的手段就是扩容,因为消费者都是无状态的

    在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
    图片名称

    上面的处理方法是错误的,它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。 然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑, 这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。

    但是,这样会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。

消息积压了该如何处理?

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。

短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级, 通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要

业务。

还有一种情况,通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,可能原因是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。

在消费端是否可以通过批量消费的方式来提升消费性能?

1、要求消费端能够批量处理或者开启多线程进行单条处理

2、批量消费一旦某一条数据消费失败会导致整批数据重复消费

3、对实时性要求不能太高,批量消费需要Broker积累到一定消费数据才会发送到Consumer

六、如何实现单个队列的并行消费?

比如说,队列中当前有 10 条消息,对应的编号是 0-9,当前的消费位置是 5。同时来了三个消费者来拉消息,把编号为 5、6、7 的消息分别给三个消费者,每人一条。

过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为 8 了,这样就实现并行消费。

这是理想的情况。还有可能编号为 6、7 的消息响应回来了,编号 5 的消息响应一直回不来,这个位置 5 就是一个消息空洞。

为了避免位置 5 把这个队列卡住,可以先把消费位置 5 这条消息,复制到一个特殊重试队列中,然后依然把消费位置更新为 8,继续消费。再有消费者来拉消息的时候,优先把重试队列中的那条消息给消费者就可以了。

七、如何保证消息的严格顺序?

topic层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成1,生产者和消费者也只能是一个实例, 这样才能保证全局严格顺序。

大部分情况下,我们并不需要全局严格顺序,只要保证局部有序就可以满足要求了。 比如,在传递账户流水记录的时候,只要保证每个账户的流水有序就可以了,不同账户之间的流水记录是不需要保证顺序的。

可以这样来实现局部严格顺序,在发送端,使用账户 ID 作为 Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。 如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。

以上是 消息队列实战(一) 基础 的全部内容, 来源链接: utcz.com/a/33384.html

回到顶部