Rocketmq学习总结
Consumer:消息消费者,与生产者类似,获取到对应的broker后,与broker建立通道,进行消息消费
NameService:理解为一个注册中心,存储当前集群所有broker信息、topic跟broker的对应关系。通过提供心跳包的形式来检测broker是否存活
Broker:集群的核心模块,主要负责topic消息存储,消费者的消费位点管理。跟所有的nameSerice保持长连接,定时发送心跳包
NameService
- 可以理解为简化的zk,起到一个注册中心的作用
- 区别与ZK是他没有监听的概念,而是通过心跳包来维持自己与Broker之间的关系
- NameService集群之间的每个节点互相之间没有通信,是无状态的
- NameService的压力不会太大,主要是维护Topic-Broker之间的映射关系
- 但若是broker中的topic信息量太大,broker向nameService注册信息的时候会导致传输时间过长超时,NameService会误判认为Broker下线
Broker
- 每台broker节点与所有的nameService保持长连接及心跳,并定时将Topic信息注册到nameService中
- 每个topic默认创建4个队列,相同的队列中保证顺序消费
- Broker同样分为master和salve,相同的BrokerName,不同的BrokerId,一个master对应多个salve,一个salve只对应一个master
- Broker上存存topic信息,topic由多个队列组成,队列会均匀分布到所有的broker上
- Producer在发送消息时,会尽量平均分布到队列中,这样保证最终所有的消息在broker上是平均分配的
Producer
- producer与随机的一个nameService节点建立长连接,定期从nameSerive中拉取topic-broker的映射信息
- 与提供topic的broker master建立一个长连接,producer每隔30秒向broker 发送一个心跳,broker每隔10秒扫描一下存活的链接
- Producer发送消息支持三种模式
- 同步
- 异步
- 单向
Comsumer
- comsumer同样采用集群部署,支持pull、push两种消费模式
- comsumer可分为广播消息消费和集群消费
pull和push消费模式
- pull是主动型消费,即能从服务器拉取到数据就开始消费
- 首先通过打算消费的topic拿到MessageQueue中的集合消息,然后遍历拿取,并记录下次取消息时的offset位
- push是被动型消费,多了一个注册消费监听器,本质还是从服务器拉取数据,但是要等到消费监听器被触发以后,才会进行消费
- push方式中,注册MessageListener监听器,取到消息后,唤醒MessageListener中的consumerMessage()来消费
集群消费和广播消费
消费同一类消息(相同topic和相同tag)的comsumer组成的group为一个consumer集群。
集群消费(默认模式)
- 集群消费保证每个消费者集群内的consumer只会对同一条消息消费一次
- 集群消费的消费进度是保存在broker上的,这样的好处是,无论消费者集群的扩大还是缩小,消息重复概率很低
- 集群消费是有消息失败重投的机制,可靠性更高
广播消费
- 同一个消费集群下的所有消费者实例都会消费一次消息
- 消费进度是保存在consumer上的,会出现消息重复
- 广播消费是没有消息失败重投的机制
Rocketmq刷盘策略
所有消息都是持久化的,先写入pagecache区,再写入磁盘,保证磁盘和内存均有一份数据,读取时读取内存数据
使用哪种刷盘方式可以调整broker配置文件中的
flushType = SYNC_FLUSH or ASYNC_FLUSH
- 同步刷盘
- 消息存储磁盘后才会返回成功
- 当消息存入pagecache区域时,立即通知刷盘线程,完成刷盘工作后,返回成功
- 同步刷盘更稳定,但是吞吐较低,适用于要求消息可靠性更高的场景
- 异步刷盘
- 消息存入pagecache区,即返回成功,当内存区域数据达到一定容量时,统一写入磁盘
- 异步刷盘高吞吐,写操作返回快
- 意外情况下断电,会导致pagecache区域尚未刷入磁盘的部分数据丢失,但是吞吐性更高
Rocketmq复制策略
当broker以集群形式分布,需要进行消息的主从同步时,会使用到复制策略
同步复制
- master和salve均写入成功后,返回成功
- master和salve数据同步,不易丢失,但是吞吐相对较低
异步复制
- master数据写入成功后,立即返回成功
- master莫名其妙宕机后,可能会出现master和salve的数据不一致的情况,吞吐性能更高
建议推荐方式:异步刷盘+同步复制
rocketMq消息丢失场景及解决方案
- 生产者将消息发送给mq途中,因出现网络抖动,导致消息丢失
- 消息存储在pagecache区,且尚未触发异步刷盘,而出现断电一类,导致数据丢失。或是存入磁盘后,磁盘损坏导致数据丢失
- Consumer从mq中拿取数据,尚未完成消费,就通知mq消费完毕,然后消费者宕机,导致消息丢失
解决方案
场景一:
- 基于生产者的分布式事务来解决
- 若是消息推送mq过程中丢失,则执行回滚操作
- 生产者发送完消息以后,mq即使接收到响应成功后,暂时消费者也不会消费的(此时处于半消息状态)
- 生产者会执行自己的链路,若是执行完毕且成功,会再次通知mq将消息commit(二次确认机制),否则进行rollback操作
场景二:
将异步刷盘改为同步刷盘,同时对于broker进行集群化部署,进行主从复制策略
场景三:
- mq会在消费端注册一个监听,当consumer拿去到消息消费时,只有消费成功后,才会发送一个COMSUME_SUCCESS的状态,mq会知道消费成功(类似与一个ACK的确认机制)
- 当节点挂掉时,rocketmq长时间收不到响应(监听也没了),就会进行故障转移,将消息发给其他消费者处理
顺序消费与并行消费(push模式下具备)
- 顺序消费必须保证需要顺序处理的消息在同一个队列,具体实现方式是通过messageQueueSelector类实现(通过hash算法,将同一个oderId的数据,放入同一个队列中)
- 消费该队列的consumer只能有一个
- 实现高吞吐的方式是,对于多个不同的订单,可以多开队列,进行并行消费
消息的重复消费问题及措施
出现消息的重复消费的原因是因为我们的rocketmq支持失败重试的机制,一些极端情况下,例如消费超时,或者mq没有收到消费端的ACK确认码,将消息发给其他消费者而出现的重复问题
- 针对普通场景,建立一个消息表。对于每条消息,创建唯一的标识,这样避免相同的消息出现重复消费
- 针对并发较高的场景,可以通过redis来代替消息表
- 甚至可以考虑布隆过滤器,但是布隆过滤器存在一定的误报风险,当误报时,会认为该条消息已存在(实际不存在),导致正常消息无法被消费
以上是 Rocketmq学习总结 的全部内容, 来源链接: utcz.com/z/514130.html