Rocketmq中的Broker启动流程续(五)
Broker的创建的过程如下
public static BrokerController createBrokerController(String[] args) { final BrokerConfig brokerConfig = new BrokerConfig(); // 1.1 创建Broker配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 1.2 创建netty服务器配置
final NettyClientConfig nettyClientConfig = new NettyClientConfig();// 1.3 创建netty客户端配置
nettyServerConfig.setListenPort(10911); //设置Broker端端口号
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 1.4 有关消息存储的配置
String namesrvAddr = brokerConfig.getNamesrvAddr();
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // 1.5 数据同步的端口号
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig); // 1.6 创建BrokerController
boolean initResult = controller.initialize(); // 1.7 BrokerController的初始化
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook")); // 1.8 向JVM中注册Hook函数
return controller;
}
1.创建Broker配置
//rocketmqHome的地址 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//nameServer的地址
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
//broker的地址
private String brokerIP1 = RemotingUtil.getLocalAddress();
private String brokerIP2 = RemotingUtil.getLocalAddress();
//broker的名称
private String brokerName = localHostName();
//broker的集群名称
private String brokerClusterName = "DefaultCluster";
private long brokerId = MixAll.MASTER_ID;
//broker的权限,只有master才能保存消息
private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
//默认的topic对应的队列数量
private int defaultTopicQueueNums = 8;
//是否自动创建topic
private boolean autoCreateTopicEnable = true;
//控制读写权限
private boolean clusterTopicEnable = true;
//控制读写权限
private boolean brokerTopicEnable = true;
//是否自动
private boolean autoCreateSubscriptionGroup = true;
//不知道干嘛的
private String messageStorePlugIn = "";
// 消息轨迹默认的topic
private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
//消息轨迹默认关闭
private boolean traceTopicEnable = false;
//发送消息的线程池数量
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
//拉取消息的线程池数量
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
//查询消息的线程池数量
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
//后台线程池数量
private int adminBrokerThreadPoolNums = 16;
//客户端线程池数量
private int clientManageThreadPoolNums = 32;
//消费者线程池数量
private int consumerManageThreadPoolNums = 32;
//心跳线程池
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
// 事务消息处理的线程池数量 主要用来提交或者回滚事务消息的
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
//
private int flushConsumerOffsetInterval = 1000 * 5;
private int flushConsumerOffsetHistoryInterval = 1000 * 60;
//是否拒绝事务消息
private boolean rejectTransactionMessage = false;
//是否自动获取nameserver的地址
private boolean fetchNamesrvAddrByAddressServer = false;
// 有个线程池中,有界队列的大小
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
//过滤服务数量,不知道干啥的
private int filterServerNums = 0;
//是否开启长连接
private boolean longPollingEnable = true;
//短连接时间
private long shortPollingTimeMills = 1000;
//消息端变化时,broker是否需要重新平衡
private boolean notifyConsumerIdsChangedEnable = true;
//是否使用堆内存
private boolean transferMsgByHeap = true;
//broker注册超时时间
private int registerBrokerTimeoutMills = 6000;
//开启从读
private boolean slaveReadEnable = false;
//消息堆积阈值
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
//是否开启快速失败
private boolean brokerFastFailureEnable = true;
1.2 创建netty服务器配置
//端口号,broker的端口号是:10911 private int listenPort = 8888;
//工作线程数
private int serverWorkerThreads = 8;
//
private int serverCallbackExecutorThreads = 0;
//selector线程数
private int serverSelectorThreads = 3;
// 单向发送消息的信号量个数
private int serverOnewaySemaphoreValue = 256;
//同步发送消息的信号量个数
private int serverAsyncSemaphoreValue = 64;
//心跳间隔时间
private int serverChannelMaxIdleTimeSeconds = 120;
// 发送缓冲区大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接受缓冲区大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否使用内存池
private boolean serverPooledByteBufAllocatorEnable = true;
//是否启动别的IO模型
private boolean useEpollNativeSelector = false;
1.3 创建netty客户端配置
//工作线程数 private int clientWorkerThreads = 4;
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
//连接超时时间
private int connectTimeoutMillis = 3000;
//心跳时间
private int clientChannelMaxIdleTimeSeconds = 120;
private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean clientPooledByteBufAllocatorEnable = false;
private boolean clientCloseSocketIfTimeout = false;
1.4 有关消息存储的配置
有关存储的配置参数太多,大家可以对着自己看吧
1.5 创建BrokerController
this.brokerConfig = brokerConfig; //broker相关配置 this.nettyServerConfig = nettyServerConfig;//netty server的相关配置
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig; //消息存储的消息配置
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
1.6 BrokerController的初始化
public boolean initialize() throws CloneNotSupportedException { //加载配置
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result) {
try {
//默认的消息存储
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//是否使用 DLedgerCommitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
//消息状态
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//加载plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
//这个地方比较重要,主要加载消息的位置和topic对应的队列
result = result && this.messageStore.load();
if (result) {
//创建remote server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
//创建fast remote server
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//发送消息线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//拉取消息线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//查询消息线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
//心跳线程池
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
//事务 commit或者rollback线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
//消费者关系线程池
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
//注册处理器
this.registerProcessor();
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//打印前天的消息总数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//启动
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
initialTransaction();
initialAcl();
initialRpcHooks();
}
return result;
}
可以看到broker的创建过程中,有各种参数的配置,各种定时任务的开启等等
以上是 Rocketmq中的Broker启动流程续(五) 的全部内容, 来源链接: utcz.com/z/514373.html