1、RocketMq系列,第一章:安装与快速入门

编程

RocketMq系列,第一章

1、MQ介绍

1.1、为什么要用MQ

消息队列是一种“先进先出”的数据结构

其应用场景主要包含以下3个方面

1.1.1、应用解耦

​ 系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

​ 使用消息队列解耦合。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

1.1.2、流量削封

​ 应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

​ 一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总体下单体验要好。

​ 出于经济考量的目的

​ 业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰。

1.1.3、数据分发

​ 通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。

1.2、MQ的优缺点

优点:解耦、削峰、数据分发

缺点:

  • 系统可用性降低

    ​ 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

    【需保证MQ的高可用】

  • 系统复杂度提高

    ​ MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。

​ 【保证消息不被重复消费,保证消息不丢失,保证消息传递的顺序性】

  • 一致性问题

    ​ A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。

    【如何保证消息数据处理的一致性?】

1.3、常用MQ比较

2、RocketMq快速入门

​ RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。

2.1、下载安装

RocketMq最新版本:4.6.0 release

解压,进入安装目录

#解压:

unzip rocketmq-all-4.6.0-bin-release.zip

#移动至opt

mv rocketmq-all-4.6.0-bin-release /opt

#重命名

mv rocketmq-all-4.6.0-bin-release rocketmq

环境变量配置

##进入配置文件

vim /etc/profile

##设置环境变量--不在bin目录下 也可以使用rocketmq命令

ROCKETMQ_HOME=/opt/rocketmq

PATH=$PATH:$ROCKETMQ_HOME/bin

export ROCKETMQ_HOME PATH

##使配置文件生肖

source /etc/profile

目录结构

bin:  启动脚本等

conf: 实例配置文件,如broker的配置文件,集群的配置文件,logback的配置文件等

lib: rockermq依赖jar包

2.2、RocketMq角色概念

2.2.1)、消息生产者Producer

负责生产消息,一般由业务系统负责生产消息。----类似:发信者

消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

2.2.2)、消息消费者Consumer

负责消费消息,一般是后台系统负责异步消费。----类似:收信者

消费者,从Broker拉取消息进行消费。从应用角度来说有两类消费者:

①、PullConsumer:应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

②、PushConsumer:该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

2.2.3)、Broker

broker主要负责消息的存储、投递和查询以及服务高可用保证。--类似邮局

主要包含以下几个重要模块:

①、Remoting Module:整个broker的实体,负责处理来自clients端的请求。

②、Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的topic订阅信息

③、Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

④、HA Service:高可用服务,提供master broker 和 slave broker之间的数据同步功能。

⑤、Index Service:根据特定的Message key对投递到broker的消息进行索引服务,以提供消息的快速查询。

2.2.4)、Name Server

NameServer其角色类似dubbo中的zookeeper--类似邮局管理者

支持Broker的动态注册与发现。主要包括两个功能:

①、Broker管理。

NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。

②、路由信息管理。

每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Produce,Consumer仍然可以动态感知Broker的路由的信息。

2.3、Message相关的结构

message是RocketMq消息传递的载体,其数据结构如下:

public class Message implements Serializable {

private static final long serialVersionUID = 8445773977080406428L;

private String topic;

private int flag;

private Map<String, String> properties;

private byte[] body;

private String transactionId;

}

2.3.1)、Topic

topic我们可以理解为第一级消息类型,类比于书的标题。

官方是这么解释的:

Topic是生产者发送消息和消费者拉取消息时的消息的类别。Topic与生产者和消费者之间的关系非常松散。具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息。类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。

##在producer中使用消息

Message msg = new Message("baseTopic","tag","hello world..".getBytes());

##在consumer中订阅消息

consumer.subscribe("baseTopic","tag");

2.3.2)、Tag

tag我们可以理解为第二级消息类型,类比于书的目录。便于检索和使用消息。

官方是这样定义的:

Tag,换句话的意思就是子主题,为用户提供了额外的灵活性。有了标签,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能。

2.3.3)、Body

producer要发送的消息内容体,以字节数组的形式进行存储。Message消息会有一定的大小限制。

2.3.4)、TransactionId

TransactionId是当消息是事务消息的时候,相关消息的事务编号。

2.3.5)、Properties

该字段是一个K-V结构。

RockerMq预定义了一组内置属性,Properties中存储了Message其余各项属性。除了内置属性之外,用户还可以设定

任意自定义属性。(属性的大小也是有大小限制的)

系统预定义的属性在org.apache.rocketmq.common.message.MessageConst类中

2.3.6)、其他

​ groupName:

RocketMq也有组的概念。代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。

作用:

在集群中,当一个生产者producer宕机,本地事务回滚后,可以继续联系该group下的另外一个生产者实例,不至于导致业务走不下去。

在消费者group中,可以实现消费消息的负载均衡和消息容错。

此外,有了groupName后,集群的动态扩容就非常方便了。只需要在新加入的机器中配置相同的groupName,该机器启动后,即可立即加入到所在的group中,参与消息的生产和消费。

producer使用组:

//使用GroupName来初始化Producer,如果不指定,就会使用默认的组名:DEFAULT_PRODUCER

DefaultMQProducer producer = new DefaultMQProducer("group_name_1");

consumer使用组:

//使用GroupName来初始化Consumer,如果不指定,就会使用默认的名字:DEFAULT_CONSUMER

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name_1");

2.3.7)、MessageExt的结构

对于发送方来说上述Message的定义已经足够,但对于RocketMq的这个处理流程来说,还需要更多的字段来记录消息的一些必要内容。

比如消息的ID,创建时间,存储时间等。这就是MessageExt

public class MessageExt extends Message {

private static final long serialVersionUID = 5720810158625748049L;

private int queueId;

private int storeSize;

private long queueOffset;

private int sysFlag;

private long bornTimestamp;

private SocketAddress bornHost;

private long storeTimestamp;

private SocketAddress storeHost;

private String msgId;

private long commitLogOffset;

private int bodyCRC;

private int reconsumeTimes;

private long preparedTransactionOffset;

}

字段

用途

queueId

记录MessageQueue编号,消息会被发送到Topic下的MessageQueue

storeSize

记录消息在Broker存盘大小

queueOffset

记录在ConsumeQueue中的偏移

sysFlag

记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识

bornTimestamp

消息创建时间,在Producer发送消息时设置

storeHost

记录存储该消息的Broker地址

msgId

消息Id

commitLogOffset

记录在Broker中存储偏移

bodyCRC

消息内容CRC校验值

reconsumeTimes

消息重试消费次数

preparedTransactionOffset

事务详细相关字段

2.4、RockerMq常用命令

常用端口:

##防火墙中记得开启

nameserver用:9876

producer用: 10909

consumer用:10911

阿里云端口安全组配置:

2.4.1)、启动命令

启动 NameServer

# 1、启动nameserver

nohup sh mqnamesrv &

#或

nohup sh mqnamesrv -n 你的IP:9876 &

# 2、查看启动日志

tail -f ~/logs/rocketmqlogs/namesrv.log

启动Broker

# 1.启动Broker

nohup sh mqbroker -n localhost:9876 &

#或

nohup sh mqbroker -n 你namesrv的IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &

# 2.查看启动日志

tail -f ~/logs/rocketmqlogs/broker.log

问题-注意:

​ 由于RockerMq默认的内存都比较大,启动的时候会因为内存不足而导致失败。需要编辑如下两个配置文件,修改JVM内存的大小。

## 编辑runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

# 改为:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

## 编辑runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

# 改为:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

消息测试:

## consumer接收消息

# 1.设置环境变量

export NAMESRV_ADDR=localhost:9876

# 2.接收消息

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

#######################

## producer发送消息

# 1.设置环境变量

export NAMESRV_ADDR=localhost:9876

# 2.使用安装包的Demo发送消息

sh tools.sh org.apache.rocketmq.example.quickstart.Producer

注:可以用 jps 查看 启动状态

2.4.2)、关闭命令

# 1.关闭NameServer

sh mqshutdown namesrv

# 2.关闭Broker

sh mqshutdown broker

注:可以用 jps 查看 启动状态

2.4.3)、mqadmin命令集合

# 在RocketMQ的bin目录下有一个mqadmin脚本,它充当着控制台的角色,可以用来完成我们常用的操作。

# 如不喜欢命令可安装第三方的可视化操控界面工具

## 查看有哪些命令

sh mqadmin

## 查看具体命令怎么用

sh mqadmin help xxx

2.4.4)、group命令

①、创建订阅组group

sh mqadmin updateSubGroup -n localhost:9876 -b localhost:10911 -g myGroup

# 参数: -b broker地址 -c 集群名称 -g 订阅组名称 -n nameserve服务地址列表,格式ip:port;ip:port

②、删除订阅组Group

sh mqadmin deleteSubGroup -n localhost:9876 -b localhost:10911 -g myGroup

# 参数: -b broker地址 -c 集群名称 -g 订阅组名称 -n nameserve服务地址列表,格式ip:port;ip:port

③、查看消费组Group

sh mqadmin consumerProgress -n localhost:9876

2.4.5)、Topic命令

①、创建Topic

sh mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t myTopic

# 参数 -n为nameServe服务地址 -b为broker服务地址 -t为topic的名称

②、删除Topic

sh mqadmin deleteTopic -n localhost:9876  -t myTopic

#参数 -n nameserve 服务地址列表,格式ip:port;ip:port; -t为topic名称

③、查询所有Topic

sh mqadmin topicList -n localhost:9876

# 参数 -n为nameServe服务地址

④、查看topic统计信息

sh mqadmin topicStatus -n localhost:9876 -t myTopic

# 参数 -n为nameServe服务地址 -t为topic的名字

其他:

其他命令就不在这里叙述,可以查看相关帮助命令,如:sh mqadmin 或 sh mqadmin help。

如不喜欢命令可安装第三方的可视化操控界面工具

2.5、常用配置信息

核心配置信息:

#所属集群名字

brokerClusterName=rocketmq-cluster

#broker名字,注意此处不同的配置文件填写的不一样

brokerName=broker-a

#0 表示 Master,>0 表示 Slave

brokerId=0

#nameServer地址,分号分割

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数

defaultTopicQueueNums=4

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog每个文件的大小默认1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

#destroyMapedFileIntervalForcibly=120000

#redeleteHangedFileInterval=120000

#检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制Master

#- SYNC_MASTER 同步双写Master

#- SLAVE

brokerRole=SYNC_MASTER

#刷盘方式

#- ASYNC_FLUSH 异步刷盘

#- SYNC_FLUSH 同步刷盘

flushDiskType=SYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

其他配置信息:

参数名

描述

默认参数(时间为单位ms,数据单位为byte)

rocketmqHome

RockerMQ主目录,默认用户主目录

 

namesrvAddr

NameServer地址

 

kvConfigPath

kv配置文件路径,包含顺序消息主题的配置信息

 

configStorePath

NameServer配置文件路径,建议使用-c指定NameServer配置文件路径

 

clusterTest

是否开启集群测试,默认为false

 

orderMessageEnable

是否支持顺序消息,默认为false

 

accessMessageInMemoryMaxRatio

访问消息在内存中比率,默认为40

40

adminBrokerThreadPoolNums

服务端处理控制台管理命令线程池线程数量

16

autoCreateSubscriptionGroup

是否自动创建消费组

true

autoCreateTopicEnable

是否自动创建主题

true

bitMapLengthConsumeQueueExt

ConsumeQueue扩展过滤bitmap大小

112

brokerClusterName

Broker集群名称

TestCluster

brokerFastFailureEnable

是否支持broker快速失败 如果为true表示会立即清除发送消息线程池,消息拉取线程池中排队任务 ,直接返回系统错误

true

brokerId

brokerID 0表示主节点 大于0表示从节点

0

brokerIP1

Broker服务地址

 

brokerIP2

BrokerHAIP地址,供slave同步消息的地址

 

brokerName

Broker服务器名称morning服务器hostname

broker-a

brokerPermission

Broker权限 默认为6表示可读可写

6

brokerRole

broker角色,分为 ASYNC_MASTER SYNC_MASTER, SLAVE

ASYNC_MASTER

brokerTopicEnable

broker名称是否可以用做主体使用

true

channelNotActiveInterval

 

60000

checkCRCOnRecover

文件恢复时是否校验CRC

true

cleanFileForciblyEnable

是否支持强行删除过期文件

true

cleanResourceInterval

清除过期文件线程调度频率

10000

clientAsyncSemaphoreValue

 

65535

clientCallbackExecutorThreads

 

8

clientChannelMaxIdleTimeSeconds

 

120

clientCloseSocketIfTimeout

 

false

clientManagerThreadPoolQueueCapacity

客户端管理线程池任务队列初始大小

1000000

clientManageThreadPoolNums

服务端处理客户端管理(心跳 注册 取消注册线程数量)

32

clientOnewaySemaphoreValue

 

65535

clientPooledByteBufAllocatorEnable

 

false

clientSocketRcvBufSize

客户端socket接收缓冲区大小

131072

clientSocketSndBufSize

客户端socket发送缓冲区大小

131072

clientWorkerThreads

 

4

clusterTopicEnable

集群名称是否可用在主题使用

true

commercialBaseCount

 

1

commercialBigCount

 

1

commercialEnable

 

true

commercialTimerCount

 

1

commercialTransCount

 

1

commitCommitLogLeastPages

一次提交至少需要脏页的数量,默认4页,针对 commitlog文件

4

commitCommitLogThoroughInterval

Commitlog两次提交的最大间隔,如果超过该间隔,将忽略commitCommitLogLeastPages直接提交

200

commitIntervalCommitLog

commitlog提交频率

200

compressedRegister

 

false

connectTimeoutMillis

链接超时时间

3000

consumerFallbehindThreshold

消息消费堆积阈值默认16GB在disableConsumeifConsumeIfConsumerReadSlowly为true时生效

17179869184

consumerManagerThreadPoolQueueCapacity

消费管理线程池任务队列大小

1000000

consumerManageThreadPoolNums

服务端处理消费管理 获取消费者列表 更新消费者进度查询消费进度等

32

debugLockEnable

是否支持 PutMessage Lock锁打印信息

false

defaultQueryMaxNum

查询消息默认返回条数,默认为32

32

defaultTopicQueueNums

主体在一个broker上创建队列数量

8

deleteCommitLogFilesInterval

删除commitlog文件的时间间隔,删除一个文件后等一下再删除一个文件

100

deleteConsumeQueueFilesInterval

删除consumequeue文件时间间隔

100

deleteWhen

磁盘文件空间充足情况下,默认每天什么时候执行删除过期文件,默认04表示凌晨4点

04

destroyMapedFileIntervalForcibly

销毁MappedFile被拒绝的最大存活时间,默认120s。清除过期文件线程在初次销毁mappedfile时,如果该文件被其他线程引用,引用次数大于0.则设置MappedFile的可用状态为false,并设置第一次删除时间,下一次清理任务到达时,如果系统时间大于初次删除时间加上本参数,则将ref次数一次减1000,知道引用次数小于0,则释放物理资源

120000

disableConsumeIfConsumerReadSlowly

如果消费组消息消费堆积是否禁用该消费组继续消费消息

false

diskFallRecorded

是否统计磁盘的使用情况,默认为true

true

diskMaxUsedSpaceRatio

commitlog目录所在分区的最大使用比例,如果commitlog目录所在的分区使用比例大于该值,则触发过期文件删除

75

duplicationEnable

是否允许重复复制,默认为 false

false

enableCalcFilterBitMap

是否开启比特位映射,这个属性不太明白

false

enableConsumeQueueExt

是否启用ConsumeQueue扩展属性

false

enablePropertyFilter

是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true

false

endTransactionPoolQueueCapacity

处理提交和回滚消息线程池线程队列大小

100000

endTransactionThreadPoolNums

处理提交和回滚消息线程池

24

expectConsumerNumUseFilter

布隆过滤器参数

32

fastFailIfNoBufferInStorePool

从 transientStorepool中获取 ByteBuffer是否支持快速失败

false

fetchNamesrvAddrByAddressServer

是否支持从服务器获取nameServer

false

fileReservedTime

文件保留时间,默认72小时,表示非当前写文件最后一次更新时间加上filereservedtime小与当前时间,该文件将被清理

120

filterDataCleanTimeSpan

清除过滤数据的时间间隔

86400000

filterServerNums

broker服务器过滤服务器数量

0

filterSupportRetry

消息过滤是否支持重试

false

flushCommitLogLeastPages

一次刷盘至少需要脏页的数量,针对commitlog文件

4

flushCommitLogThoroughInterval

commitlog两次刷盘的最大间隔,如果超过该间隔,将fushCommitLogLeastPages要求直接执行刷盘操作

10000

flushCommitLogTimed

表示await方法等待FlushIntervalCommitlog,如果为true表示使用Thread.sleep方法等待

false

flushConsumeQueueLeastPages

一次刷盘至少需要脏页的数量,默认2页,针对 Consume文件

2

flushConsumeQueueThoroughInterval

Consume两次刷盘的最大间隔,如果超过该间隔,将忽略

60000

flushConsumerOffsetHistoryInterval

fushConsumeQueueLeastPages直接刷盘

60000

flushConsumerOffsetInterval

持久化消息消费进度 consumerOffse.json文件的频率ms

5000

flushDelayOffsetInterval

延迟队列拉取进度刷盘间隔。默认10s

10000

flushDiskType

刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值SYNC_FLUSH(同步刷盘)

ASYNC_FLUSH

flushIntervalCommitLog

commitlog刷盘频率

500

flushIntervalConsumeQueue

consumuQueue文件刷盘频率

1000

flushLeastPagesWhenWarmMapedFile

用字节0填充整个文件的,每多少页刷盘一次。默认4096页,异步刷盘模式生效

4096

forceRegister

是否强制注册

true

haHousekeepingInterval

Master与save长连接空闲时间,超过该时间将关闭连接

20000

haListenPort

Master监听端口,从服务器连接该端口,默认为10912

10912

haMasterAddress

Master服务器IP地址与端口号

 

haSendHeartbeatInterval

Master与Slave心跳包发送间隔

5000

haSlaveFallbehindMax

允许从服务器落户的最大偏移字节数,默认为256M。超过该值则表示该Slave不可用

268435456

haTransferBatchSize

一次HA主从同步传输的最大字节长度,默认为32K

32768

heartbeatThreadPoolNums

心跳线程池线程数

8

heartbeatThreadPoolQueueCapacity

心跳线程队列数量

50000

highSpeedMode

当前版本未使用

false

listenPort

服务端监听端口

10911

longPollingEnable

是否开启长轮训

true

mapedFileSizeCommitLog

单个conmmitlog文件大小默认1GB

1073741824

mapedFileSizeConsumeQueue

单个consumequeue文件大小默认30W*20表示单个Consumequeue文件中存储30W个ConsumeQueue条目

6000000

mappedFileSizeConsumeQueueExt

ConsumeQueue扩展文件大小默认48MB

50331648

maxDelayTime

当前版本未使用

40

maxErrorRateOfBloomFilter

布隆过滤器参数

20

maxHashSlotNum

单个索引文件hash槽的个数,默认为五百万

5000000

maxIndexNum

单个索引文件索引条目的个数,默认为两千万

20000000

maxMessageSize

默认允许的最大消息体默认4M

4194304

maxMsgsNumBatch

一次查询消息最大返回消息条数,默认64条

64

maxTransferBytesOnMessageInDisk

一次服务消息端消息拉取,消息在磁盘中传输允许的最大字节

65536

maxTransferBytesOnMessageInMemory

一次服务端消息拉取,消息在内存中传输允许的最大传输字节数默认256kb

262144

maxTransferCountOnMessageInDisk

一次消息服务端消息拉取,消息在磁盘中传输允许的最大条数,默认为8条

8

maxTransferCountOnMessageInMemory

一次服务消息拉取,消息在内存中传输运行的最大消息条数,默认为32条

32

messageDelayLevel

延迟队列等级(s=秒,m=分,h=小时)

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

messageIndexEnable

是否支持消息索引文件

true

messageIndexSafe

消息索引是否安全,默认为 false,文件恢复时选择文件检测点(commitlog.consumeque)的最小的与文件最后更新对比,如果为true,文件恢复时选择文件检测点保存的索引更新时间作为对比

false

messageStorePlugIn

消息存储插件地址默认为空字符串

 

namesrvAddr

nameServer地址

 

notifyConsumerIdsChangedEnable

消费者数量变化后是否立即通知RebalenceService线程,以便马上进行重新负载

true

offsetCheckInSlave

从服务器是否坚持 offset检测

false

osPageCacheBusyTimeOutMills

putMessage锁占用超过该时间,表示 PageCache忙

1000

pullMessageThreadPoolNums

服务端处理消息拉取线程池线程数量 默认为16加上当前操作系统CPU核数的两倍

32

pullThreadPoolQueueCapacity

消息拉去线程池任务队列初始大小

100000

putMsgIndexHightWater

当前版本未使用

600000

queryMessageThreadPoolNums

服务端处理查询消息线程池数量默认为8加上当前操作系统CPU核数的两倍

16

queryThreadPoolQueueCapacity

查询消息线程池任务队列初始大小

20000

redeleteHangedFileInterval

重试删除文件间隔,配合destorymapedfileintervalforcibly

120000

regionId

消息区域

DefaultRegion

registerBrokerTimeoutMills

注册broker超时时间

6000

registerNameServerPeriod

broker注册频率 大于1分钟为1分钟小于10秒为10秒

30000

rejectTransactionMessage

是否拒绝事物消息

false

rocketmqHome

RocketMQ主目录

/home/rocketmq/rocketmq-all-4.3.2-bin-release

sendMessageThreadPoolNums

服务端处理消息发送线程池数量

1

sendThreadPoolQueueCapacity

消息发送线程池任务队列初始大小

10000

serverAsyncSemaphoreValue

异步消息发送最大并发度

64

serverCallbackExecutorThreads

netty public任务线程池个数,netty网络设计没根据业务类型会创建不同线程池毛笔如处理发送消息,消息消费心跳检测等。如果业务类型(RequestCode)未注册线程池,则由public线程池执行

0

serverChannelMaxIdleTimeSeconds

网络连接最大空闲时间。如果链接空闲时间超过此参数设置的值,连接将被关闭

120

serverOnewaySemaphoreValue

send oneway消息请求并发度

256

serverPooledByteBufAllocatorEnable

ByteBuffer是否开启缓存

true

serverSelectorThreads

IO线程池线程个数,主要是NameServer.broker端解析请求,返回相应的线程个数,这类县城主要是处理网络请求的,解析请求包。然后转发到各个业务线程池完成具体的业务无操作,然后将结果在返回调用方

3

serverSocketRcvBufSize

netty网络socket接收缓存区大小16MB

131072

serverSocketSndBufSize

netty网络socket发送缓存区大小16MB

131072

serverWorkerThreads

netty业务线程池个数

8

shortPollingTimeMills

短轮训等待时间

1000

slaveReadEnable

从节点是否可读

false

startAcceptSendRequestTimeStamp

 

0

storePathCommitLog

Commitlog存储目录默认为${storePathRootDir}/commitlog

/home/rocketmq/store/commitlog

storePathRootDir

broker存储目录 默认为用户的主目录/store

/home/rocketmq/store

syncFlushTimeout

同步刷盘超时时间

5000

traceOn

 

true

transactionCheckInterval

事物回查周期

60000

transactionCheckMax

事物回查次数

15

transactionTimeOut

事物回查超时时间

6000

transferMsgByHeap

消息传输是否使用堆内存

true

transientStorePoolEnable

Commitlog是否开启 transientStorePool机制,默认为 false

false

transientStorePoolSize

transientStorePool中缓存 ByteBuffer个数,默认5个

5

useEpollNativeSelector

是否启用Epoll IO模型。Linux环境建议开启

false

useReentrantLockWhenPutMessage

消息存储到commitlog文件时获取锁类型,如果为true使用ReentrantLock否则使用自旋锁

false

useTLS

是否使用安全传输层协议

false

waitTimeMillsInHeartbeatQueue

清理broker心跳线程等待时间

31000

waitTimeMillsInPullQueue

清除消息拉取线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInPullQueue,本次请求任务暂时不移除该任务

5000

waitTimeMillsInSendQueue

清除发送线程池任务队列的等待时间。如果系统时间减去任务放入队列中的时间小于waitTimeMillsInSendQueue,本次请求任务暂时不移除该任务

200

waitTimeMillsInTransactionQueue

清理提交和回滚消息线程队列等待时间

3000

warmMapedFileEnable

是否温和地使用 MappedFile如果为true,将不强制将内存映射文件锁定在内存中

false

connectWhichBroker

FilterServer连接的Broker地址

 

filterServerIP

FilterServerIP地址,默认为本地服务器IP

 

compressMsgBodyOverHowmuch

如果消息Body超过该值则启用

 

zipCompresslevel

Zip压缩方式,默认为5,详细定义请参考java.util.Deflate中的定义

 

clientUploadFilterClassEnable

是否支持客户端上传 FilterClass代码

 

filterClassRepertoryUrl

filterClass服务地址,如果 clientUploadFilterClassEnable为false,则需要提供一个地址从该服务器获取过滤类的代码

 

fsServerAsyncSemaphorevalue

FilterServer异步请求并发度,默认为2048

 

fsServerCallbackExecutorThreads

处理回调任务的线程池数量,默认为64

 

fsServerWorkerThreads

远程服务调用线程池数量,默认为64

 

以上是 1、RocketMq系列,第一章:安装与快速入门 的全部内容, 来源链接: utcz.com/z/512254.html

回到顶部