【Java】深入学习RocketMQ之快速入门

深入学习RocketMQ之快速入门

又坏又迷人发布于 今天 12:38

RocketMQ - 整体介绍

简介

  • RocketMQ是一款分布式、队列模型的消息中间件。
  • 支持集群模型、负载均衡、水平扩展能力。
  • 采用零拷贝的原理、顺序写盘、随机读。
  • 代码优秀,底层通信框架使用 Netty 。
  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展。
  • 消息失败重试机制、消息可查询。

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件,具有以下特性:

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递。
  3. 支持拉(pull)和推(push)两种消息模式。
  4. 单一队列百万消息的堆积能力。
  5. 支持多种消息协议,如 JMS、MQTT 等。
  6. 分布式高可用的部署架构,满足至少一次消息传递语义。
  7. 提供 docker 镜像用于隔离测试和云集群部署。
  8. 提供配置、指标和监控等功能丰富的 Dashboard。

概念模型

Producer:消息生产者,负责生产消息,一般由业务系统负责产生消息。

Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

Push Consumer:Consumer的一种,需要向Consumer对象注册监听。

Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。

Producer Group:生产者集合,一般用于发送一类消息。

Consumer Group:消费者集合,一般用于接收一类消息进行消费。

Broker:MQ消息服务(中转角色,用于消息存储于生产消息转发)。

环境搭建

环境:JDK8、Centos7、RocketMQ 4.3

首先我们编辑Hosts

vim /etc/hosts

加入下面两句话,修改为你自己的ip。

192.168.3.160 rocketmq-nameserver1

192.168.3.160 rocketmq-master1

随后我们将RocketMQ tar.gz传入服务器。

传入之后我们创建文件夹。

# 创建文件夹

mkdir /usr/local/apache-rocketmq

# 然后解压

tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq

# 建立软连接

ln -s apache-rocketmq rocketmq

创建存储路径。

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

修改配置文件。

vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

brokerClusterName=rocketmq-cluster

#broker 名字,注意此处不同的配置文件填写的不一样

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=0

#nameServer 地址,分号分割 一定要和我们配置的hosts里的相同

namesrvAddr=rocketmq-nameserver1:9876

#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4 点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog 每个文件的大小默认 1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制 Master

#- SYNC_MASTER 同步双写 Master

#- SLAVE

brokerRole=ASYNC_MASTER

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

修改日志配置文件。

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh

#==============================================================================

# 开发环境 JVM Configuration

#==============================================================================

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -

XX:MaxPermSize=320m"

启动 NameServer

cd /usr/local/rocketmq/bin

nohup sh mqnamesrv &

# 使用jps查看

[[email protected] bin]# jps

22321 NamesrvStartup

22335 Jps

启动BrokerServer

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

# jps查看

[[email protected] bin]# jps

22321 NamesrvStartup

22535 Jps

22440 BrokerStartup

控制台使用

下载代码:https://github.com/apache/roc...

打开rocketmq-console。

修改properties

rocketmq.config.namesrvAddr=192.168.3.160:9876

启动代码,访问localhost:8080

【Java】深入学习RocketMQ之快速入门

RocketMQ - 急速入门

生产者使用

首先我们创建一个SpringBoot项目,引入RocketMQ依赖。

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>4.3.0</version>

</dependency>

创建一个Producer类

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Producer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

producer.setNamesrvAddr(NAME_SRV_ADDR);

producer.start();

for (int i = 0; i < 5; i++) {

//1.创建消息

Message message = new Message("test_quick_topic",//主题

"TagA_" + i, // 标签

"KeyA_" + i, //用户自定义key,唯一标识

"Hello RocketMQ".getBytes());//消息内容实体

//2.发送消息

SendResult result = producer.send(message);

System.out.println("消息发送结果:" + result);

}

producer.shutdown();

}

}

点击运行后,我们可以在控制台看到5条结果已经发送成功。

【Java】深入学习RocketMQ之快速入门

我们打开web管理界面可以看到今天有五条消息进来。

【Java】深入学习RocketMQ之快速入门

并且在Message里我们可以看到五条消息。

【Java】深入学习RocketMQ之快速入门

消费者使用

创建一个Consumer类

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import org.apache.rocketmq.remoting.common.RemotingHelper;

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

点击运行后,我们可以在控制台看到已经收到5条结果。

【Java】深入学习RocketMQ之快速入门

消息失败重试

下面我们测试一下消息发送失败的情况。

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

if (keys.equals("KeyA_1")) {

int i = 1 / 0; // 抛出异常

}

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

int reconsumeTimes = messageExt.getReconsumeTimes(); // 失败次数

System.out.println("失败消息已被重发次数:"+ reconsumeTimes);

if(reconsumeTimes == 3){

// 记录日志...

// 补偿机制...

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

我们重新启动Producer和Consumer。

可以看到RocketMQ会在一段时间间隔后重新发送此消息,直到达到三次我们进行SUCCESS做日志或者补偿机制。

【Java】深入学习RocketMQ之快速入门

四种集群环境构建详解

【Java】深入学习RocketMQ之快速入门

Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

集群模式之-单点模式

这种模式很明显,一旦节点挂掉,整体服务就不可用了。

集群模式之-主从模式

多Master多Slave模式,同步双写(NM-NS,SYNC)

  • 每个Master配备一个Slave,共有多对Master-Slave,HA采用同步双写机制,主从都写入消息成功后,再向应用返回ACK。
  • 优点:数据与服务都无单点故障问题,Master宕机情况下,消息无延迟,服务可用性和数据可用性都非常高。
  • 缺点:性能比异步复制略低,大概低10%,发送单个消息的RT会略高。目前宕机情况下,从节点不能自动切换成主节点,后续会支持自动切换功能。

多Master多Slave模式,异步复制(NM-NS,ASYNC)

  • 每个Master配备一个Slave,共有多对Master-Slave,HA采用异步复制方式,主从有短暂消息延迟,毫秒级别。
  • 优点:即使磁盘损坏,消息的丢失也非常少,而且消息的实时性不会受到影响,因为Master宕机后,消费者仍然可以从Slave中消费消息,此过程对应用完全透明,不需要人工干预,性能同多Master模式几乎一样。
  • 缺点:Master宕机后,如果磁盘出现损坏,可能丢失少量消息。

集群模式之-双主模式

双Master模式/多Master模式(2M)

  • 一个集群无Slave,全是Master,例如2个Master或者3个Master。
  • 优点:配置简单,单个Master宕机或者重启对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘可靠性非常高,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘完全不丢失),性能最高。
  • 缺点:单台机器宕机时,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响。

另外还有双主双从模式、多主多从模式。

主从集群模式搭建

主节点:192.168.3.160

从节点:192.168.3.161

首先打开主节点

vim /etc/hosts

增加161节点数据

192.168.3.161 rocketmq-nameserver2

192.168.3.161 rocketmq-master1-slave

然后将4条数据复制到161节点的hosts文件中。

接着我们把tar.gz复制到161。

scp apache-rocketmq.tar.gz 192.168.3.161:/usr/local/

还是之前的操作

# 创建文件夹

mkdir /usr/local/apache-rocketmq

# 然后解压

tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq

# 建立软连接

ln -s apache-rocketmq rocketmq

创建存储路径。

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

修改日志配置文件。

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh

#==============================================================================

# 开发环境 JVM Configuration

#==============================================================================

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -

XX:MaxPermSize=320m"

接下来修改配置

我们进入160服务器。

cd /usr/local/rocketmq/conf/2m-2s-async

首先我们修改

vim broker-a.properties

增加2节点的地址。

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

随后我们修改

vim broker-a-s.properties

brokerClusterName=rocketmq-cluster

#broker 名字,注意此处不同的配置文件填写的不一样

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=1

#nameServer 地址,分号分割 一定要和我们配置的hosts里的相同

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4 点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog 每个文件的大小默认 1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制 Master

#- SYNC_MASTER 同步双写 Master

#- SLAVE

brokerRole=SLAVE

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

修改完成后保存,然后执行复制命令拷贝到161节点。

scp broker-a.properties broker-a-s.properties 192.168.3.161:/usr/local/rocketmq/conf/2m-2s-async/

检查没有问题后,我们回到160节点。

进入bin目录,进行启动,同时161节点同样。

nohup sh mqnamesrv &

随后启动Broker

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

[[email protected] bin]# jps

4642 NamesrvStartup

4761 BrokerStartup

4777 Jps

切换到161,我们执行下面命令,注意启动的配置是broker-a-s.properties

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

[[email protected] bin]# jps

23377 NamesrvStartup

23524 Jps

23414 BrokerStartup

随后修改我们的web项目配置。增加161地址。

rocketmq.config.namesrvAddr=192.168.3.160:9876;192.168.3.161:9876

重启后查看Cluster。可以看到已经有两个节点。

【Java】深入学习RocketMQ之快速入门

主从模式高可用机制故障演练

Producer类:

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Producer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

producer.setNamesrvAddr(NAME_SRV_ADDR);

producer.start();

//1.创建消息

Message message = new Message("test_quick_topic",//主题

"TagA_", // 标签

"KeyA_", //用户自定义key,唯一标识

"Hello RocketMQ".getBytes());//消息内容实体

//2.发送消息

SendResult result = producer.send(message);

System.out.println("消息发送结果:" + result);

producer.shutdown();

}

}

Consumer类:

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

我们启动Producer类。查看控制台。没有问题。

消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A803A5174918B4AAC284383C9C0000, offsetMsgId=C0A803A000002A9F0000000000000FE0, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=3]

这个时候我们停止160主节点。

sh mqshutdown broker

可以看到目前只有slave节点。

【Java】深入学习RocketMQ之快速入门

随后我们启动Consumer类。可以看到消息依然可以被消费。

【Java】深入学习RocketMQ之快速入门

javarocketmq

阅读 32发布于 今天 12:38

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议


Java菜鸟程序员

Be a good developer .微信公众号:Java菜鸟程序员

avatar

又坏又迷人

codeing.....

532 声望

299 粉丝

0 条评论

得票时间

avatar

又坏又迷人

codeing.....

532 声望

299 粉丝

宣传栏

RocketMQ - 整体介绍

简介

  • RocketMQ是一款分布式、队列模型的消息中间件。
  • 支持集群模型、负载均衡、水平扩展能力。
  • 采用零拷贝的原理、顺序写盘、随机读。
  • 代码优秀,底层通信框架使用 Netty 。
  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展。
  • 消息失败重试机制、消息可查询。

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件,具有以下特性:

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型。
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递。
  3. 支持拉(pull)和推(push)两种消息模式。
  4. 单一队列百万消息的堆积能力。
  5. 支持多种消息协议,如 JMS、MQTT 等。
  6. 分布式高可用的部署架构,满足至少一次消息传递语义。
  7. 提供 docker 镜像用于隔离测试和云集群部署。
  8. 提供配置、指标和监控等功能丰富的 Dashboard。

概念模型

Producer:消息生产者,负责生产消息,一般由业务系统负责产生消息。

Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

Push Consumer:Consumer的一种,需要向Consumer对象注册监听。

Pull Consumer:Consumer的一种,需要主动请求Broker拉取消息。

Producer Group:生产者集合,一般用于发送一类消息。

Consumer Group:消费者集合,一般用于接收一类消息进行消费。

Broker:MQ消息服务(中转角色,用于消息存储于生产消息转发)。

环境搭建

环境:JDK8、Centos7、RocketMQ 4.3

首先我们编辑Hosts

vim /etc/hosts

加入下面两句话,修改为你自己的ip。

192.168.3.160 rocketmq-nameserver1

192.168.3.160 rocketmq-master1

随后我们将RocketMQ tar.gz传入服务器。

传入之后我们创建文件夹。

# 创建文件夹

mkdir /usr/local/apache-rocketmq

# 然后解压

tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq

# 建立软连接

ln -s apache-rocketmq rocketmq

创建存储路径。

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

修改配置文件。

vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

brokerClusterName=rocketmq-cluster

#broker 名字,注意此处不同的配置文件填写的不一样

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=0

#nameServer 地址,分号分割 一定要和我们配置的hosts里的相同

namesrvAddr=rocketmq-nameserver1:9876

#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4 点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog 每个文件的大小默认 1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制 Master

#- SYNC_MASTER 同步双写 Master

#- SLAVE

brokerRole=ASYNC_MASTER

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

修改日志配置文件。

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh

#==============================================================================

# 开发环境 JVM Configuration

#==============================================================================

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -

XX:MaxPermSize=320m"

启动 NameServer

cd /usr/local/rocketmq/bin

nohup sh mqnamesrv &

# 使用jps查看

[[email protected] bin]# jps

22321 NamesrvStartup

22335 Jps

启动BrokerServer

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

# jps查看

[[email protected] bin]# jps

22321 NamesrvStartup

22535 Jps

22440 BrokerStartup

控制台使用

下载代码:https://github.com/apache/roc...

打开rocketmq-console。

修改properties

rocketmq.config.namesrvAddr=192.168.3.160:9876

启动代码,访问localhost:8080

【Java】深入学习RocketMQ之快速入门

RocketMQ - 急速入门

生产者使用

首先我们创建一个SpringBoot项目,引入RocketMQ依赖。

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-client</artifactId>

<version>4.3.0</version>

</dependency>

创建一个Producer类

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Producer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

producer.setNamesrvAddr(NAME_SRV_ADDR);

producer.start();

for (int i = 0; i < 5; i++) {

//1.创建消息

Message message = new Message("test_quick_topic",//主题

"TagA_" + i, // 标签

"KeyA_" + i, //用户自定义key,唯一标识

"Hello RocketMQ".getBytes());//消息内容实体

//2.发送消息

SendResult result = producer.send(message);

System.out.println("消息发送结果:" + result);

}

producer.shutdown();

}

}

点击运行后,我们可以在控制台看到5条结果已经发送成功。

【Java】深入学习RocketMQ之快速入门

我们打开web管理界面可以看到今天有五条消息进来。

【Java】深入学习RocketMQ之快速入门

并且在Message里我们可以看到五条消息。

【Java】深入学习RocketMQ之快速入门

消费者使用

创建一个Consumer类

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

import org.apache.rocketmq.remoting.common.RemotingHelper;

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

点击运行后,我们可以在控制台看到已经收到5条结果。

【Java】深入学习RocketMQ之快速入门

消息失败重试

下面我们测试一下消息发送失败的情况。

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

if (keys.equals("KeyA_1")) {

int i = 1 / 0; // 抛出异常

}

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

int reconsumeTimes = messageExt.getReconsumeTimes(); // 失败次数

System.out.println("失败消息已被重发次数:"+ reconsumeTimes);

if(reconsumeTimes == 3){

// 记录日志...

// 补偿机制...

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

我们重新启动Producer和Consumer。

可以看到RocketMQ会在一段时间间隔后重新发送此消息,直到达到三次我们进行SUCCESS做日志或者补偿机制。

【Java】深入学习RocketMQ之快速入门

四种集群环境构建详解

【Java】深入学习RocketMQ之快速入门

Name Server

Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。

Producer

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

Consumer

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。

集群模式之-单点模式

这种模式很明显,一旦节点挂掉,整体服务就不可用了。

集群模式之-主从模式

多Master多Slave模式,同步双写(NM-NS,SYNC)

  • 每个Master配备一个Slave,共有多对Master-Slave,HA采用同步双写机制,主从都写入消息成功后,再向应用返回ACK。
  • 优点:数据与服务都无单点故障问题,Master宕机情况下,消息无延迟,服务可用性和数据可用性都非常高。
  • 缺点:性能比异步复制略低,大概低10%,发送单个消息的RT会略高。目前宕机情况下,从节点不能自动切换成主节点,后续会支持自动切换功能。

多Master多Slave模式,异步复制(NM-NS,ASYNC)

  • 每个Master配备一个Slave,共有多对Master-Slave,HA采用异步复制方式,主从有短暂消息延迟,毫秒级别。
  • 优点:即使磁盘损坏,消息的丢失也非常少,而且消息的实时性不会受到影响,因为Master宕机后,消费者仍然可以从Slave中消费消息,此过程对应用完全透明,不需要人工干预,性能同多Master模式几乎一样。
  • 缺点:Master宕机后,如果磁盘出现损坏,可能丢失少量消息。

集群模式之-双主模式

双Master模式/多Master模式(2M)

  • 一个集群无Slave,全是Master,例如2个Master或者3个Master。
  • 优点:配置简单,单个Master宕机或者重启对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘可靠性非常高,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘完全不丢失),性能最高。
  • 缺点:单台机器宕机时,这台机器上未被消费的消息在机器恢复之前不可订阅,消息的实时性会受到影响。

另外还有双主双从模式、多主多从模式。

主从集群模式搭建

主节点:192.168.3.160

从节点:192.168.3.161

首先打开主节点

vim /etc/hosts

增加161节点数据

192.168.3.161 rocketmq-nameserver2

192.168.3.161 rocketmq-master1-slave

然后将4条数据复制到161节点的hosts文件中。

接着我们把tar.gz复制到161。

scp apache-rocketmq.tar.gz 192.168.3.161:/usr/local/

还是之前的操作

# 创建文件夹

mkdir /usr/local/apache-rocketmq

# 然后解压

tar -zxvf apache-rocketmq.tar.gz -C /usr/local/apache-rocketmq

# 建立软连接

ln -s apache-rocketmq rocketmq

创建存储路径。

mkdir /usr/local/rocketmq/store

mkdir /usr/local/rocketmq/store/commitlog

mkdir /usr/local/rocketmq/store/consumequeue

mkdir /usr/local/rocketmq/store/index

修改日志配置文件。

mkdir -p /usr/local/rocketmq/logs

cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

修改启动脚本参数。

vim /usr/local/rocketmq/bin/runbroker.sh

#==============================================================================

# 开发环境 JVM Configuration

#==============================================================================

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g

vim /usr/local/rocketmq/bin/runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:PermSize=128m -

XX:MaxPermSize=320m"

接下来修改配置

我们进入160服务器。

cd /usr/local/rocketmq/conf/2m-2s-async

首先我们修改

vim broker-a.properties

增加2节点的地址。

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

随后我们修改

vim broker-a-s.properties

brokerClusterName=rocketmq-cluster

#broker 名字,注意此处不同的配置文件填写的不一样

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=1

#nameServer 地址,分号分割 一定要和我们配置的hosts里的相同

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4 点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog 每个文件的大小默认 1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制 Master

#- SYNC_MASTER 同步双写 Master

#- SLAVE

brokerRole=SLAVE

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

修改完成后保存,然后执行复制命令拷贝到161节点。

scp broker-a.properties broker-a-s.properties 192.168.3.161:/usr/local/rocketmq/conf/2m-2s-async/

检查没有问题后,我们回到160节点。

进入bin目录,进行启动,同时161节点同样。

nohup sh mqnamesrv &

随后启动Broker

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

[[email protected] bin]# jps

4642 NamesrvStartup

4761 BrokerStartup

4777 Jps

切换到161,我们执行下面命令,注意启动的配置是broker-a-s.properties

nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

[[email protected] bin]# jps

23377 NamesrvStartup

23524 Jps

23414 BrokerStartup

随后修改我们的web项目配置。增加161地址。

rocketmq.config.namesrvAddr=192.168.3.160:9876;192.168.3.161:9876

重启后查看Cluster。可以看到已经有两个节点。

【Java】深入学习RocketMQ之快速入门

主从模式高可用机制故障演练

Producer类:

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Producer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

public static void main(String[] args) throws Exception {

DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

producer.setNamesrvAddr(NAME_SRV_ADDR);

producer.start();

//1.创建消息

Message message = new Message("test_quick_topic",//主题

"TagA_", // 标签

"KeyA_", //用户自定义key,唯一标识

"Hello RocketMQ".getBytes());//消息内容实体

//2.发送消息

SendResult result = producer.send(message);

System.out.println("消息发送结果:" + result);

producer.shutdown();

}

}

Consumer类:

/**

* @author 又坏又迷人

* 公众号: Java菜鸟程序员

* @date 2021/1/26

* @Description:

*/

public class Consumer {

public static final String NAME_SRV_ADDR = "192.168.3.160:9876;192.168.3.161";

public static void main(String[] args) throws Exception {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");

consumer.setNamesrvAddr(NAME_SRV_ADDR);

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //从最后端开始消费

consumer.subscribe("test_quick_topic",//订阅的主题

"*");// *代表包含所有

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, consumeConcurrentlyContext) -> {

MessageExt messageExt = msgs.get(0);

try {

String topic = messageExt.getTopic();

String tags = messageExt.getTags();

String keys = messageExt.getKeys();

String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

System.out.println("topic:" + topic + ", tags : " + tags + ", keys :" + keys + ", msgBody:" + msgBody);

} catch (Exception e) {

e.printStackTrace();

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

consumer.start();

}

}

我们启动Producer类。查看控制台。没有问题。

消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A803A5174918B4AAC284383C9C0000, offsetMsgId=C0A803A000002A9F0000000000000FE0, messageQueue=MessageQueue [topic=test_quick_topic, brokerName=broker-a, queueId=3], queueOffset=3]

这个时候我们停止160主节点。

sh mqshutdown broker

可以看到目前只有slave节点。

【Java】深入学习RocketMQ之快速入门

随后我们启动Consumer类。可以看到消息依然可以被消费。

【Java】深入学习RocketMQ之快速入门

以上是 【Java】深入学习RocketMQ之快速入门 的全部内容, 来源链接: utcz.com/a/108521.html

回到顶部