Rocketmq中的Broker启动流程续(五)

编程


Broker的创建的过程如下

 public static BrokerController createBrokerController(String[] args) {

final BrokerConfig brokerConfig = new BrokerConfig(); // 1.1 创建Broker配置

final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 1.2 创建netty服务器配置

final NettyClientConfig nettyClientConfig = new NettyClientConfig();// 1.3 创建netty客户端配置

nettyServerConfig.setListenPort(10911); //设置Broker端端口号

final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 1.4 有关消息存储的配置

String namesrvAddr = brokerConfig.getNamesrvAddr();

messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); // 1.5 数据同步的端口号

final BrokerController controller = new BrokerController(

brokerConfig,

nettyServerConfig,

nettyClientConfig,

messageStoreConfig); // 1.6 创建BrokerController

boolean initResult = controller.initialize(); // 1.7 BrokerController的初始化

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

private volatile boolean hasShutdown = false;

private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override

public void run() {

synchronized (this) {

log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());

if (!this.hasShutdown) {

this.hasShutdown = true;

long beginTime = System.currentTimeMillis();

controller.shutdown();

long consumingTimeTotal = System.currentTimeMillis() - beginTime;

log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);

}

}

}

}, "ShutdownHook")); // 1.8 向JVM中注册Hook函数

return controller;

}

1.创建Broker配置

	//rocketmqHome的地址

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

//nameServer的地址

private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));

//broker的地址

private String brokerIP1 = RemotingUtil.getLocalAddress();

private String brokerIP2 = RemotingUtil.getLocalAddress();

//broker的名称

private String brokerName = localHostName();

//broker的集群名称

private String brokerClusterName = "DefaultCluster";

private long brokerId = MixAll.MASTER_ID;

//broker的权限,只有master才能保存消息

private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;

//默认的topic对应的队列数量

private int defaultTopicQueueNums = 8;

//是否自动创建topic

private boolean autoCreateTopicEnable = true;

//控制读写权限

private boolean clusterTopicEnable = true;

//控制读写权限

private boolean brokerTopicEnable = true;

//是否自动

private boolean autoCreateSubscriptionGroup = true;

//不知道干嘛的

private String messageStorePlugIn = "";

// 消息轨迹默认的topic

private String msgTraceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;

//消息轨迹默认关闭

private boolean traceTopicEnable = false;

//发送消息的线程池数量

private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;

//拉取消息的线程池数量

private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;

//查询消息的线程池数量

private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();

//后台线程池数量

private int adminBrokerThreadPoolNums = 16;

//客户端线程池数量

private int clientManageThreadPoolNums = 32;

//消费者线程池数量

private int consumerManageThreadPoolNums = 32;

//心跳线程池

private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());

// 事务消息处理的线程池数量 主要用来提交或者回滚事务消息的

private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;

//

private int flushConsumerOffsetInterval = 1000 * 5;

private int flushConsumerOffsetHistoryInterval = 1000 * 60;

//是否拒绝事务消息

private boolean rejectTransactionMessage = false;

//是否自动获取nameserver的地址

private boolean fetchNamesrvAddrByAddressServer = false;

// 有个线程池中,有界队列的大小

private int sendThreadPoolQueueCapacity = 10000;

private int pullThreadPoolQueueCapacity = 100000;

private int queryThreadPoolQueueCapacity = 20000;

private int clientManagerThreadPoolQueueCapacity = 1000000;

private int consumerManagerThreadPoolQueueCapacity = 1000000;

private int heartbeatThreadPoolQueueCapacity = 50000;

private int endTransactionPoolQueueCapacity = 100000;

//过滤服务数量,不知道干啥的

private int filterServerNums = 0;

//是否开启长连接

private boolean longPollingEnable = true;

//短连接时间

private long shortPollingTimeMills = 1000;

//消息端变化时,broker是否需要重新平衡

private boolean notifyConsumerIdsChangedEnable = true;

//是否使用堆内存

private boolean transferMsgByHeap = true;

//broker注册超时时间

private int registerBrokerTimeoutMills = 6000;

//开启从读

private boolean slaveReadEnable = false;

//消息堆积阈值

private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;

//是否开启快速失败

private boolean brokerFastFailureEnable = true;


1.2 创建netty服务器配置

//端口号,broker的端口号是:10911

private int listenPort = 8888;

//工作线程数

private int serverWorkerThreads = 8;

//

private int serverCallbackExecutorThreads = 0;

//selector线程数

private int serverSelectorThreads = 3;

// 单向发送消息的信号量个数

private int serverOnewaySemaphoreValue = 256;

//同步发送消息的信号量个数

private int serverAsyncSemaphoreValue = 64;

//心跳间隔时间

private int serverChannelMaxIdleTimeSeconds = 120;

// 发送缓冲区大小

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;

// 接受缓冲区大小

private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;

// 是否使用内存池

private boolean serverPooledByteBufAllocatorEnable = true;

//是否启动别的IO模型

private boolean useEpollNativeSelector = false;


1.3 创建netty客户端配置

	//工作线程数

private int clientWorkerThreads = 4;

private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();

private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;

private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;

//连接超时时间

private int connectTimeoutMillis = 3000;

//心跳时间

private int clientChannelMaxIdleTimeSeconds = 120;

private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;

private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;

private boolean clientPooledByteBufAllocatorEnable = false;

private boolean clientCloseSocketIfTimeout = false;


1.4 有关消息存储的配置

有关存储的配置参数太多,大家可以对着自己看吧

1.5 创建BrokerController

         this.brokerConfig = brokerConfig; //broker相关配置

this.nettyServerConfig = nettyServerConfig;//netty server的相关配置

this.nettyClientConfig = nettyClientConfig;

this.messageStoreConfig = messageStoreConfig; //消息存储的消息配置

this.consumerOffsetManager = new ConsumerOffsetManager(this);

this.topicConfigManager = new TopicConfigManager(this);

this.pullMessageProcessor = new PullMessageProcessor(this);

this.pullRequestHoldService = new PullRequestHoldService(this);

this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);

this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);

this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);

this.consumerFilterManager = new ConsumerFilterManager(this);

this.producerManager = new ProducerManager();

this.clientHousekeepingService = new ClientHousekeepingService(this);

this.broker2Client = new Broker2Client(this);

this.subscriptionGroupManager = new SubscriptionGroupManager(this);

this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);

this.filterServerManager = new FilterServerManager(this);

this.slaveSynchronize = new SlaveSynchronize(this);

this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());

this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());

this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());

this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());

this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());

this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());

this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

this.brokerFastFailure = new BrokerFastFailure(this);

this.configuration = new Configuration(

log,

BrokerPathConfigHelper.getBrokerConfigPath(),

this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig

);

1.6 BrokerController的初始化

public boolean initialize() throws CloneNotSupportedException {

//加载配置

boolean result = this.topicConfigManager.load();

result = result && this.consumerOffsetManager.load();

result = result && this.subscriptionGroupManager.load();

result = result && this.consumerFilterManager.load();

if (result) {

try {

//默认的消息存储

this.messageStore =

new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,

this.brokerConfig);

//是否使用 DLedgerCommitLog

if (messageStoreConfig.isEnableDLegerCommitLog()) {

DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);

((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);

}

//消息状态

this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);

//加载plugin

MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);

this.messageStore = MessageStoreFactory.build(context, this.messageStore);

this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));

} catch (IOException e) {

result = false;

log.error("Failed to initialize", e);

}

}

//这个地方比较重要,主要加载消息的位置和topic对应的队列

result = result && this.messageStore.load();

if (result) {

//创建remote server

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);

NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();

fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);

//创建fast remote server

this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);

//发送消息线程池

this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(

this.brokerConfig.getSendMessageThreadPoolNums(),

this.brokerConfig.getSendMessageThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.sendThreadPoolQueue,

new ThreadFactoryImpl("SendMessageThread_"));

//拉取消息线程池

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(

this.brokerConfig.getPullMessageThreadPoolNums(),

this.brokerConfig.getPullMessageThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.pullThreadPoolQueue,

new ThreadFactoryImpl("PullMessageThread_"));

//查询消息线程池

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(

this.brokerConfig.getQueryMessageThreadPoolNums(),

this.brokerConfig.getQueryMessageThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.queryThreadPoolQueue,

new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =

Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(

"AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(

this.brokerConfig.getClientManageThreadPoolNums(),

this.brokerConfig.getClientManageThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.clientManagerThreadPoolQueue,

new ThreadFactoryImpl("ClientManageThread_"));

//心跳线程池

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(

this.brokerConfig.getHeartbeatThreadPoolNums(),

this.brokerConfig.getHeartbeatThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.heartbeatThreadPoolQueue,

new ThreadFactoryImpl("HeartbeatThread_", true));

//事务 commit或者rollback线程池

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(

this.brokerConfig.getEndTransactionThreadPoolNums(),

this.brokerConfig.getEndTransactionThreadPoolNums(),

1000 * 60,

TimeUnit.MILLISECONDS,

this.endTransactionThreadPoolQueue,

new ThreadFactoryImpl("EndTransactionThread_"));

//消费者关系线程池

this.consumerManageExecutor =

Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(

"ConsumerManageThread_"));

//注册处理器

this.registerProcessor();

final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();

final long period = 1000 * 60 * 60 * 24;

//打印前天的消息总数

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.getBrokerStats().record();

} catch (Throwable e) {

log.error("schedule record error.", e);

}

}

}, initialDelay, period, TimeUnit.MILLISECONDS);

//启动

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.consumerOffsetManager.persist();

} catch (Throwable e) {

log.error("schedule persist consumerOffset error.", e);

}

}

}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.consumerFilterManager.persist();

} catch (Throwable e) {

log.error("schedule persist consumer filter error.", e);

}

}

}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.protectBroker();

} catch (Throwable e) {

log.error("protectBroker error.", e);

}

}

}, 3, 3, TimeUnit.MINUTES);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.printWaterMark();

} catch (Throwable e) {

log.error("printWaterMark error.", e);

}

}

}, 10, 1, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

} catch (Throwable e) {

log.error("schedule dispatchBehindBytes error.", e);

}

}

}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

if (this.brokerConfig.getNamesrvAddr() != null) {

this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());

log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());

} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

} catch (Throwable e) {

log.error("ScheduledTask fetchNameServerAddr exception", e);

}

}

}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {

if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {

this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());

this.updateMasterHAServerAddrPeriodically = false;

} else {

this.updateMasterHAServerAddrPeriodically = true;

}

} else {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.printMasterAndSlaveDiff();

} catch (Throwable e) {

log.error("schedule printMasterAndSlaveDiff error.", e);

}

}

}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

}

}

initialTransaction();

initialAcl();

initialRpcHooks();

}

return result;

}


可以看到broker的创建过程中,有各种参数的配置,各种定时任务的开启等等

以上是 Rocketmq中的Broker启动流程续(五) 的全部内容, 来源链接: utcz.com/z/514373.html

回到顶部