1、RocketMq系列,第一章:安装与快速入门
RocketMq系列,第一章
1、MQ介绍
1.1、为什么要用MQ
消息队列是一种“先进先出”的数据结构
其应用场景主要包含以下3个方面
1.1.1、应用解耦
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦合。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
1.1.2、流量削封
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。
一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总体下单体验要好。
出于经济考量的目的
业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰。
1.1.3、数据分发
通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
1.2、MQ的优缺点
优点:解耦、削峰、数据分发
缺点:
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
【需保证MQ的高可用】
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
【保证消息不被重复消费,保证消息不丢失,保证消息传递的顺序性】
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
【如何保证消息数据处理的一致性?】
1.3、常用MQ比较
2、RocketMq快速入门
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
2.1、下载安装
RocketMq最新版本:4.6.0 release
解压,进入安装目录
#解压:unzip rocketmq-all-4.6.0-bin-release.zip
#移动至opt
mv rocketmq-all-4.6.0-bin-release /opt
#重命名
mv rocketmq-all-4.6.0-bin-release rocketmq
环境变量配置
##进入配置文件vim /etc/profile
##设置环境变量--不在bin目录下 也可以使用rocketmq命令
ROCKETMQ_HOME=/opt/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
##使配置文件生肖
source /etc/profile
目录结构
bin: 启动脚本等conf: 实例配置文件,如broker的配置文件,集群的配置文件,logback的配置文件等
lib: rockermq依赖jar包
2.2、RocketMq角色概念
2.2.1)、消息生产者Producer
负责生产消息,一般由业务系统负责生产消息。----类似:发信者 消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。
2.2.2)、消息消费者Consumer
负责消费消息,一般是后台系统负责异步消费。----类似:收信者 消费者,从Broker拉取消息进行消费。从应用角度来说有两类消费者:
①、PullConsumer:应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
②、PushConsumer:该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
2.2.3)、Broker
broker主要负责消息的存储、投递和查询以及服务高可用保证。--类似邮局主要包含以下几个重要模块:
①、Remoting Module:整个broker的实体,负责处理来自clients端的请求。
②、Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的topic订阅信息
③、Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
④、HA Service:高可用服务,提供master broker 和 slave broker之间的数据同步功能。
⑤、Index Service:根据特定的Message key对投递到broker的消息进行索引服务,以提供消息的快速查询。
2.2.4)、Name Server
NameServer其角色类似dubbo中的zookeeper--类似邮局管理者支持Broker的动态注册与发现。主要包括两个功能:
①、Broker管理。
NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
②、路由信息管理。
每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Produce,Consumer仍然可以动态感知Broker的路由的信息。
2.3、Message相关的结构
message是RocketMq消息传递的载体,其数据结构如下:
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
2.3.1)、Topic
topic我们可以理解为第一级消息类型,类比于书的标题。官方是这么解释的:
Topic是生产者发送消息和消费者拉取消息时的消息的类别。Topic与生产者和消费者之间的关系非常松散。具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息。类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。
##在producer中使用消息Message msg = new Message("baseTopic","tag","hello world..".getBytes());
##在consumer中订阅消息
consumer.subscribe("baseTopic","tag");
2.3.2)、Tag
tag我们可以理解为第二级消息类型,类比于书的目录。便于检索和使用消息。官方是这样定义的:
Tag,换句话的意思就是子主题,为用户提供了额外的灵活性。有了标签,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能。
2.3.3)、Body
producer要发送的消息内容体,以字节数组的形式进行存储。Message消息会有一定的大小限制。
2.3.4)、TransactionId
TransactionId是当消息是事务消息的时候,相关消息的事务编号。
2.3.5)、Properties
该字段是一个K-V结构。RockerMq预定义了一组内置属性,Properties中存储了Message其余各项属性。除了内置属性之外,用户还可以设定
任意自定义属性。(属性的大小也是有大小限制的)
系统预定义的属性在org.apache.rocketmq.common.message.MessageConst类中
2.3.6)、其他
groupName:
RocketMq也有组的概念。代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。作用:
在集群中,当一个生产者producer宕机,本地事务回滚后,可以继续联系该group下的另外一个生产者实例,不至于导致业务走不下去。
在消费者group中,可以实现消费消息的负载均衡和消息容错。
此外,有了groupName后,集群的动态扩容就非常方便了。只需要在新加入的机器中配置相同的groupName,该机器启动后,即可立即加入到所在的group中,参与消息的生产和消费。
producer使用组:
//使用GroupName来初始化Producer,如果不指定,就会使用默认的组名:DEFAULT_PRODUCER
DefaultMQProducer producer = new DefaultMQProducer("group_name_1");
consumer使用组:
//使用GroupName来初始化Consumer,如果不指定,就会使用默认的名字:DEFAULT_CONSUMER
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name_1");
2.3.7)、MessageExt的结构
对于发送方来说上述Message的定义已经足够,但对于RocketMq的这个处理流程来说,还需要更多的字段来记录消息的一些必要内容。
比如消息的ID,创建时间,存储时间等。这就是MessageExt
public class MessageExt extends Message { private static final long serialVersionUID = 5720810158625748049L;
private int queueId;
private int storeSize;
private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;
private long preparedTransactionOffset;
}
queueId
记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
storeSize
记录消息在Broker存盘大小
queueOffset
记录在ConsumeQueue中的偏移
sysFlag
记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp
消息创建时间,在Producer发送消息时设置
storeHost
记录存储该消息的Broker地址
msgId
消息Id
commitLogOffset
记录在Broker中存储偏移
bodyCRC
消息内容CRC校验值
reconsumeTimes
消息重试消费次数
preparedTransactionOffset
事务详细相关字段
2.4、RockerMq常用命令
常用端口:
##防火墙中记得开启nameserver用:9876
producer用: 10909
consumer用:10911
阿里云端口安全组配置:
2.4.1)、启动命令
启动 NameServer
# 1、启动nameservernohup sh mqnamesrv &
#或
nohup sh mqnamesrv -n 你的IP:9876 &
# 2、查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动Broker
# 1.启动Brokernohup sh mqbroker -n localhost:9876 &
#或
nohup sh mqbroker -n 你namesrv的IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
问题-注意:
由于RockerMq默认的内存都比较大,启动的时候会因为内存不足而导致失败。需要编辑如下两个配置文件,修改JVM内存的大小。
## 编辑runserver.shJAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 改为:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
## 编辑runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 改为:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
消息测试:
## consumer接收消息# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
#######################
## producer发送消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
注:可以用 jps 查看 启动状态
2.4.2)、关闭命令
# 1.关闭NameServersh mqshutdown namesrv
# 2.关闭Broker
sh mqshutdown broker
注:可以用 jps 查看 启动状态
2.4.3)、mqadmin命令集合
# 在RocketMQ的bin目录下有一个mqadmin脚本,它充当着控制台的角色,可以用来完成我们常用的操作。# 如不喜欢命令可安装第三方的可视化操控界面工具
## 查看有哪些命令
sh mqadmin
## 查看具体命令怎么用
sh mqadmin help xxx
2.4.4)、group命令
①、创建订阅组group
sh mqadmin updateSubGroup -n localhost:9876 -b localhost:10911 -g myGroup# 参数: -b broker地址 -c 集群名称 -g 订阅组名称 -n nameserve服务地址列表,格式ip:port;ip:port
②、删除订阅组Group
sh mqadmin deleteSubGroup -n localhost:9876 -b localhost:10911 -g myGroup# 参数: -b broker地址 -c 集群名称 -g 订阅组名称 -n nameserve服务地址列表,格式ip:port;ip:port
③、查看消费组Group
sh mqadmin consumerProgress -n localhost:9876
2.4.5)、Topic命令
①、创建Topic
sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t myTopic# 参数 -n为nameServe服务地址 -b为broker服务地址 -t为topic的名称
②、删除Topic
sh mqadmin deleteTopic -n localhost:9876 -t myTopic#参数 -n nameserve 服务地址列表,格式ip:port;ip:port; -t为topic名称
③、查询所有Topic
sh mqadmin topicList -n localhost:9876# 参数 -n为nameServe服务地址
④、查看topic统计信息
sh mqadmin topicStatus -n localhost:9876 -t myTopic# 参数 -n为nameServe服务地址 -t为topic的名字
其他:
其他命令就不在这里叙述,可以查看相关帮助命令,如:sh mqadmin 或 sh mqadmin help。
如不喜欢命令可安装第三方的可视化操控界面工具
2.5、常用配置信息
核心配置信息:
#所属集群名字brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
其他配置信息:
rocketmqHome
RockerMQ主目录,默认用户主目录
namesrvAddr
NameServer地址
kvConfigPath
kv配置文件路径,包含顺序消息主题的配置信息
configStorePath
NameServer配置文件路径,建议使用-c指定NameServer配置文件路径
clusterTest
是否开启集群测试,默认为false
orderMessageEnable
是否支持顺序消息,默认为false
accessMessageInMemoryMaxRatio
访问消息在内存中比率,默认为40
40
adminBrokerThreadPoolNums
服务端处理控制台管理命令线程池线程数量
16
autoCreateSubscriptionGroup
是否自动创建消费组
true
autoCreateTopicEnable
是否自动创建主题
true
bitMapLengthConsumeQueueExt
ConsumeQueue扩展过滤bitmap大小
112
brokerClusterName
Broker集群名称
TestCluster
brokerFastFailureEnable
是否支持broker快速失败 如果为true表示会立即清除发送消息线程池,消息拉取线程池中排队任务 ,直接返回系统错误
true
brokerId
brokerID 0表示主节点 大于0表示从节点
0
brokerIP1
Broker服务地址
brokerIP2
BrokerHAIP地址,供slave同步消息的地址
brokerName
Broker服务器名称morning服务器hostname
broker-a
brokerPermission
Broker权限 默认为6表示可读可写
6
brokerRole
broker角色,分为 ASYNC_MASTER SYNC_MASTER, SLAVE
ASYNC_MASTER
brokerTopicEnable
broker名称是否可以用做主体使用
true
channelNotActiveInterval
60000
checkCRCOnRecover
文件恢复时是否校验CRC
true
cleanFileForciblyEnable
是否支持强行删除过期文件
true
cleanResourceInterval
清除过期文件线程调度频率
10000
clientAsyncSemaphoreValue
65535
clientCallbackExecutorThreads
8
clientChannelMaxIdleTimeSeconds
120
clientCloseSocketIfTimeout
false
clientManagerThreadPoolQueueCapacity
客户端管理线程池任务队列初始大小
1000000
clientManageThreadPoolNums
服务端处理客户端管理(心跳 注册 取消注册线程数量)
32
clientOnewaySemaphoreValue
65535
clientPooledByteBufAllocatorEnable
false
clientSocketRcvBufSize
客户端socket接收缓冲区大小
131072
clientSocketSndBufSize
客户端socket发送缓冲区大小
131072
clientWorkerThreads
4
clusterTopicEnable
集群名称是否可用在主题使用
true
commercialBaseCount
1
commercialBigCount
1
commercialEnable
true
commercialTimerCount
1
commercialTransCount
1
commitCommitLogLeastPages
一次提交至少需要脏页的数量,默认4页,针对 commitlog文件
4
commitCommitLogThoroughInterval
Commitlog两次提交的最大间隔,如果超过该间隔,将忽略commitCommitLogLeastPages直接提交
200
commitIntervalCommitLog
commitlog提交频率
200
compressedRegister
false
connectTimeoutMillis
链接超时时间
3000
consumerFallbehindThreshold
消息消费堆积阈值默认16GB在disableConsumeifConsumeIfConsumerReadSlowly为true时生效
17179869184
consumerManagerThreadPoolQueueCapacity
消费管理线程池任务队列大小
1000000
consumerManageThreadPoolNums
服务端处理消费管理 获取消费者列表 更新消费者进度查询消费进度等
32
debugLockEnable
是否支持 PutMessage Lock锁打印信息
false
defaultQueryMaxNum
查询消息默认返回条数,默认为32
32
defaultTopicQueueNums
主体在一个broker上创建队列数量
8
deleteCommitLogFilesInterval
删除commitlog文件的时间间隔,删除一个文件后等一下再删除一个文件
100
deleteConsumeQueueFilesInterval
删除consumequeue文件时间间隔
100
deleteWhen
磁盘文件空间充足情况下,默认每天什么时候执行删除过期文件,默认04表示凌晨4点
04
destroyMapedFileIntervalForcibly
销毁MappedFile被拒绝的最大存活时间,默认120s。清除过期文件线程在初次销毁mappedfile时,如果该文件被其他线程引用,引用次数大于0.则设置MappedFile的可用状态为false,并设置第一次删除时间,下一次清理任务到达时,如果系统时间大于初次删除时间加上本参数,则将ref次数一次减1000,知道引用次数小于0,则释放物理资源
120000
disableConsumeIfConsumerReadSlowly
如果消费组消息消费堆积是否禁用该消费组继续消费消息
false
diskFallRecorded
是否统计磁盘的使用情况,默认为true
true
diskMaxUsedSpaceRatio
commitlog目录所在分区的最大使用比例,如果commitlog目录所在的分区使用比例大于该值,则触发过期文件删除
75
duplicationEnable
是否允许重复复制,默认为 false
false
enableCalcFilterBitMap
是否开启比特位映射,这个属性不太明白
false
enableConsumeQueueExt
是否启用ConsumeQueue扩展属性
false
enablePropertyFilter
是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true
false
endTransactionPoolQueueCapacity
处理提交和回滚消息线程池线程队列大小
100000
endTransactionThreadPoolNums
处理提交和回滚消息线程池
24
expectConsumerNumUseFilter
布隆过滤器参数
32
fastFailIfNoBufferInStorePool
从 transientStorepool中获取 ByteBuffer是否支持快速失败
false
fetchNamesrvAddrByAddressServer
是否支持从服务器获取nameServer
false
fileReservedTime
文件保留时间,默认72小时,表示非当前写文件最后一次更新时间加上filereservedtime小与当前时间,该文件将被清理
120
filterDataCleanTimeSpan
清除过滤数据的时间间隔
86400000
filterServerNums
broker服务器过滤服务器数量
0
filterSupportRetry
消息过滤是否支持重试
false
flushCommitLogLeastPages
一次刷盘至少需要脏页的数量,针对commitlog文件
4
flushCommitLogThoroughInterval
commitlog两次刷盘的最大间隔,如果超过该间隔,将fushCommitLogLeastPages要求直接执行刷盘操作
10000
flushCommitLogTimed
表示await方法等待FlushIntervalCommitlog,如果为true表示使用Thread.sleep方法等待
false
flushConsumeQueueLeastPages
一次刷盘至少需要脏页的数量,默认2页,针对 Consume文件
2
flushConsumeQueueThoroughInterval
Consume两次刷盘的最大间隔,如果超过该间隔,将忽略
60000
flushConsumerOffsetHistoryInterval
fushConsumeQueueLeastPages直接刷盘
60000
flushConsumerOffsetInterval
持久化消息消费进度 consumerOffse.json文件的频率ms
5000
flushDelayOffsetInterval
延迟队列拉取进度刷盘间隔。默认10s
10000
flushDiskType
刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值SYNC_FLUSH(同步刷盘)
ASYNC_FLUSH
flushIntervalCommitLog
commitlog刷盘频率
500
flushIntervalConsumeQueue
consumuQueue文件刷盘频率
1000
flushLeastPagesWhenWarmMapedFile
用字节0填充整个文件的,每多少页刷盘一次。默认4096页,异步刷盘模式生效
4096
forceRegister
是否强制注册
true
haHousekeepingInterval
Master与save长连接空闲时间,超过该时间将关闭连接
20000
haListenPort
Master监听端口,从服务器连接该端口,默认为10912
10912
haMasterAddress
Master服务器IP地址与端口号
haSendHeartbeatInterval
Master与Slave心跳包发送间隔
5000
haSlaveFallbehindMax
允许从服务器落户的最大偏移字节数,默认为256M。超过该值则表示该Slave不可用
268435456
haTransferBatchSize
一次HA主从同步传输的最大字节长度,默认为32K
32768
heartbeatThreadPoolNums
心跳线程池线程数
8
heartbeatThreadPoolQueueCapacity
心跳线程队列数量
50000
highSpeedMode
当前版本未使用
false
listenPort
服务端监听端口
10911
longPollingEnable
是否开启长轮训
true
mapedFileSizeCommitLog
单个conmmitlog文件大小默认1GB
1073741824
mapedFileSizeConsumeQueue
单个consumequeue文件大小默认30W*20表示单个Consumequeue文件中存储30W个ConsumeQueue条目
6000000
mappedFileSizeConsumeQueueExt
ConsumeQueue扩展文件大小默认48MB
50331648
maxDelayTime
当前版本未使用
40
maxErrorRateOfBloomFilter
布隆过滤器参数
20
maxHashSlotNum
单个索引文件hash槽的个数,默认为五百万
5000000
maxIndexNum
单个索引文件索引条目的个数,默认为两千万
20000000
maxMessageSize
默认允许的最大消息体默认4M
4194304
maxMsgsNumBatch
一次查询消息最大返回消息条数,默认64条
64
maxTransferBytesOnMessageInDisk
一次服务消息端消息拉取,消息在磁盘中传输允许的最大字节
65536
maxTransferBytesOnMessageInMemory
一次服务端消息拉取,消息在内存中传输允许的最大传输字节数默认256kb
262144
maxTransferCountOnMessageInDisk
一次消息服务端消息拉取,消息在磁盘中传输允许的最大条数,默认为8条
8
maxTransferCountOnMessageInMemory
一次服务消息拉取,消息在内存中传输运行的最大消息条数,默认为32条
32
messageDelayLevel
延迟队列等级(s=秒,m=分,h=小时)
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageIndexEnable
是否支持消息索引文件
true
messageIndexSafe
消息索引是否安全,默认为 false,文件恢复时选择文件检测点(commitlog.consumeque)的最小的与文件最后更新对比,如果为true,文件恢复时选择文件检测点保存的索引更新时间作为对比
false
messageStorePlugIn
消息存储插件地址默认为空字符串
namesrvAddr
nameServer地址
notifyConsumerIdsChangedEnable
消费者数量变化后是否立即通知RebalenceService线程,以便马上进行重新负载
true
offsetCheckInSlave
从服务器是否坚持 offset检测
false
osPageCacheBusyTimeOutMills
putMessage锁占用超过该时间,表示 PageCache忙
1000
pullMessageThreadPoolNums
服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍
32
pullThreadPoolQueueCapacity
消息拉去线程池任务队列初始大小
100000
putMsgIndexHightWater
当前版本未使用
600000
queryMessageThreadPoolNums
服务端处理查询消息线程池数量默认为8加上当前操作系统CPU核数的两倍
16
queryThreadPoolQueueCapacity
查询消息线程池任务队列初始大小
20000
redeleteHangedFileInterval
重试删除文件间隔,配合destorymapedfileintervalforcibly
120000
regionId
消息区域
DefaultRegion
registerBrokerTimeoutMills
注册broker超时时间
6000
registerNameServerPeriod
broker注册频率 大于1分钟为1分钟小于10秒为10秒
30000
rejectTransactionMessage
是否拒绝事物消息
false
rocketmqHome
RocketMQ主目录
/home/rocketmq/rocketmq-all-4.3.2-bin-release
sendMessageThreadPoolNums
服务端处理消息发送线程池数量
1
sendThreadPoolQueueCapacity
消息发送线程池任务队列初始大小
10000
serverAsyncSemaphoreValue
异步消息发送最大并发度
64
serverCallbackExecutorThreads
netty public任务线程池个数,netty网络设计没根据业务类型会创建不同线程池毛笔如处理发送消息,消息消费心跳检测等。如果业务类型(RequestCode)未注册线程池,则由public线程池执行
0
serverChannelMaxIdleTimeSeconds
网络连接最大空闲时间。如果链接空闲时间超过此参数设置的值,连接将被关闭
120
serverOnewaySemaphoreValue
send oneway消息请求并发度
256
serverPooledByteBufAllocatorEnable
ByteBuffer是否开启缓存
true
serverSelectorThreads
IO线程池线程个数,主要是NameServer.broker端解析请求,返回相应的线程个数,这类县城主要是处理网络请求的,解析请求包。然后转发到各个业务线程池完成具体的业务无操作,然后将结果在返回调用方
3
serverSocketRcvBufSize
netty网络socket接收缓存区大小16MB
131072
serverSocketSndBufSize
netty网络socket发送缓存区大小16MB
131072
serverWorkerThreads
netty业务线程池个数
8
shortPollingTimeMills
短轮训等待时间
1000
slaveReadEnable
从节点是否可读
false
startAcceptSendRequestTimeStamp
0
storePathCommitLog
Commitlog存储目录默认为${storePathRootDir}/commitlog
/home/rocketmq/store/commitlog
storePathRootDir
broker存储目录 默认为用户的主目录/store
/home/rocketmq/store
syncFlushTimeout
同步刷盘超时时间
5000
traceOn
true
transactionCheckInterval
事物回查周期
60000
transactionCheckMax
事物回查次数
15
transactionTimeOut
事物回查超时时间
6000
transferMsgByHeap
消息传输是否使用堆内存
true
transientStorePoolEnable
Commitlog是否开启 transientStorePool机制,默认为 false
false
transientStorePoolSize
transientStorePool中缓存 ByteBuffer个数,默认5个
5
useEpollNativeSelector
是否启用Epoll IO模型。Linux环境建议开启
false
useReentrantLockWhenPutMessage
消息存储到commitlog文件时获取锁类型,如果为true使用ReentrantLock否则使用自旋锁
false
useTLS
是否使用安全传输层协议
false
waitTimeMillsInHeartbeatQueue
清理broker心跳线程等待时间
31000
waitTimeMillsInPullQueue
清除消息拉取线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInPullQueue,本次请求任务暂时不移除该任务
5000
waitTimeMillsInSendQueue
清除发送线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInSendQueue,本次请求任务暂时不移除该任务
200
waitTimeMillsInTransactionQueue
清理提交和回滚消息线程队列等待时间
3000
warmMapedFileEnable
是否温和地使用 MappedFile如果为true,将不强制将内存映射文件锁定在内存中
false
connectWhichBroker
FilterServer连接的Broker地址
filterServerIP
FilterServerIP地址,默认为本地服务器IP
compressMsgBodyOverHowmuch
如果消息Body超过该值则启用
zipCompresslevel
Zip压缩方式,默认为5,详细定义请参考java.util.Deflate中的定义
clientUploadFilterClassEnable
是否支持客户端上传 FilterClass代码
filterClassRepertoryUrl
filterClass服务地址,如果 clientUploadFilterClassEnable为false,则需要提供一个地址从该服务器获取过滤类的代码
fsServerAsyncSemaphorevalue
FilterServer异步请求并发度,默认为2048
fsServerCallbackExecutorThreads
处理回调任务的线程池数量,默认为64
fsServerWorkerThreads
远程服务调用线程池数量,默认为64
以上是 1、RocketMq系列,第一章:安装与快速入门 的全部内容, 来源链接: utcz.com/z/512254.html