Rocketmq中的Broker启动流程(四)

编程


程序的启动入口在org.apache.rocketmq.broker.BrokerStartup中,非常的简单,简单到要吐一口老血

 public static void main(String[] args) {

start(createBrokerController(args)); // 创建BrokerController,然后启动,是不是简单的要死

}

下面分析一下这个两个流程中还有哪些东西


1.创建BrokerController

 public static BrokerController createBrokerController(String[] args) {

final BrokerConfig brokerConfig = new BrokerConfig(); // 1.1 创建Broker配置

final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 1.2 创建netty服务器配置

final NettyClientConfig nettyClientConfig = new NettyClientConfig();// 1.3 创建netty客户端配置

nettyServerConfig.setListenPort(10911); //设置Broker端端口号

final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 1.4 有关消息存储的配置

String namesrvAddr = brokerConfig.getNamesrvAddr();

messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // 1.5 数据同步的端口号

final BrokerController controller = new BrokerController(

brokerConfig,

nettyServerConfig,

nettyClientConfig,

messageStoreConfig); // 1.6 创建BrokerController

boolean initResult = controller.initialize(); // 1.7 BrokerController的初始化

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

private volatile boolean hasShutdown = false;

private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override

public void run() {

synchronized (this) {

log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());

if (!this.hasShutdown) {

this.hasShutdown = true;

long beginTime = System.currentTimeMillis();

controller.shutdown();

long consumingTimeTotal = System.currentTimeMillis() - beginTime;

log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);

}

}

}

}, "ShutdownHook")); // 1.8 向JVM中注册Hook函数

return controller;

}

以上可以看出,从1.1 到 1.5 是设置各种配置参数, 1.6 创建Broker ,1.7 是Broker的初始化过程,1.8是注册Hook函数,那么这个时候能我们就创建了Broker了,下面看下启动流程。


2.Broker的启动

 public static BrokerController start(BrokerController controller) {

controller.start(); // 启动,看具体的实现

return controller;

}

public void start() throws Exception {

if (this.messageStore != null) {

this.messageStore.start(); // 2.1 启动消息存储

}

if (this.remotingServer != null) {

this.remotingServer.start(); // 2.2 启动路由服务

}

if (this.fastRemotingServer != null) {

this.fastRemotingServer.start(); // 2.3 快速路由服务,主要用于扫描生产者和消费者是否还存活

}

if (this.fileWatchService != null) {

this.fileWatchService.start();

}

if (this.brokerOuterAPI != null) {

this.brokerOuterAPI.start(); // 2.4 broker的对外api

}

if (this.pullRequestHoldService != null) {

this.pullRequestHoldService.start(); // 2.5 启动拉取消息的辅助服务,主要看消息是否到达

}

if (this.clientHousekeepingService != null) {

this.clientHousekeepingService.start(); // 2.6 客户端心跳服务

}

if (this.filterServerManager != null) {

this.filterServerManager.start(); // 2.7 过滤消息服务

}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {

startProcessorByHa(messageStoreConfig.getBrokerRole());

handleSlaveSynchronize(messageStoreConfig.getBrokerRole());

}

this.registerBrokerAll(true, false, true); // 2.8 注册Broker

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

} catch (Throwable e) {

log.error("registerBrokerAll Exception", e);

}

}

}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); // 2.9 启动定时任务

if (this.brokerStatsManager != null) {

this.brokerStatsManager.start(); //2.10 Broker状态管理

}

if (this.brokerFastFailure != null) {

this.brokerFastFailure.start(); // 2.11 清理过期请求

}

}

可以看到,在Broker启动的过程中有多复杂,消息存储的启动,心跳的检查,注册Broker,扫描过期的请求,监听消息是否到达等等功能,在下几篇的文章中我们在一点一点的分析

参考资料

http://rocketmq.apache.org/

以上是 Rocketmq中的Broker启动流程(四) 的全部内容, 来源链接: utcz.com/z/514283.html

回到顶部