Rocketmq中的nameser启动流程(三)
1.程序启动的入口
程序启动的入口,在org.apache.rocketmq.namesrv.NamesrvStartup中
public static void main(String[] args) { main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args); //2.创建NamesrvController
start(controller);//3.启动namesrv
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
namesrv的启动主流程很清晰就两件事情,一个是创建流程,一个是启动流程,但是每个流程里面,又有很多的点
2.创建流程
我删除一些零散的逻辑,下面是主要的逻辑
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 2.1 创建nameserver的配置文件
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 2.2 创建对应的服务端配置文件
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置nameserver的监听端口号
nettyServerConfig.setListenPort(9876);
// 2.3 创建NameServer
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
return controller;
}
2.1 创建nameserver的配置文件(NamesrvConfig)
// rocketmq的路径,这个路径下面可以放置一下配置文件,例如logback.xml文件 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// kvConfigPath
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// configStorePath
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
//是否顺序消息
private boolean orderMessageEnable = false;
2.2 创建对应的服务端配置文件(NettyServerConfig)
//端口号,nameserver的端口号是:9876 private int listenPort = 8888;
//工作线程数
private int serverWorkerThreads = 8;
// 目前在nameserver端没有用处
private int serverCallbackExecutorThreads = 0;
//selector线程数
private int serverSelectorThreads = 3;
// 单向发送消息的信号量个数
private int serverOnewaySemaphoreValue = 256;
//同步发送消息的信号量个数
private int serverAsyncSemaphoreValue = 64;
//心跳间隔时间
private int serverChannelMaxIdleTimeSeconds = 120;
// 发送缓冲区大小
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接受缓冲区大小
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否使用内存池
private boolean serverPooledByteBufAllocatorEnable = true;
//是否启动别的IO模型
private boolean useEpollNativeSelector = false;
2.3 创建NameServer
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { //nameserver的配置
this.namesrvConfig = namesrvConfig;
//有关netty服务端的配置
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
//路由信息的配置,这个比较重要
this.routeInfoManager = new RouteInfoManager();
//监听broker下线信息,并通过RouteInfoManager中的方法移除内存中的broker信息
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
//配置文件存储逻辑
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
//我们来看下一下 RouteInfoManager 里面都什么什么
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); //topicQueueTable 存放该topic下面对应的队列信息
this.brokerAddrTable = new HashMap<String, BrokerData>(128); //brokerAddrTable 存放broker对应的信息
this.clusterAddrTable = new HashMap<String, Set<String>>(32); // clusterAddrTable 存放集群对应的信息
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); //brokerLiveTable 存放活跃的broker信息
this.filterServerTable = new HashMap<String, List<String>>(256); //filterServerTable 存放需要过滤的信息
}
2.4 创建流程小结
主要是一些参数的设置和一些路由信息的变量初始化话工作
3.启动nameserver
先看一下启动的整体流程
public static NamesrvController start(final NamesrvController controller) throws Exception { //3.1 初始化namesrv
boolean initResult = controller.initialize();
//3.2 向JVM中注册Hook函数 JVM关闭之前,执行该Hook函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
//3.3 nameserv启动
controller.start();
return controller;
}
主要分为了三个部分,初始化,向JVM中注册Hook函数,然后启动nameserver
3.1 初始化nameserver
public boolean initialize() { //加载配置
this.kvConfigManager.load();
//3.1.1 初始化netty服务端
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//3.1.2初始化工作线程池,8个线程
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//3.1.3注册默认处理器
this.registerProcessor();
//3.1.4每隔10分钟扫描一下不在存活的broker并剔除不存活的机器
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//3.1.5每隔十分钟打印配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
主要是初始化netty服务端数据,初始化工作线程池,注册一些默认的处理器,和启动两个定时任务,下面看下这几个流程都干了什么
3.1.1 初始化netty服务端
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) { //设置单向发送的信号量个数和同步发送的信号量个数
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
//设置brokerHousekeepingService
this.channelEventListener = channelEventListener;
//获取线程数
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
//公共服务线程池(大概含义就是如果一个任务找不到对应的线程池处理,那么就放到这个地方)
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
//根据操作系统选择,使用Epoll方式还是使用NIO方式
if (useEpoll()) {
//监听TCP连接
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
//socket 注册到 selector ,3个selector
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
}
3.1.2初始化工作线程池 (这个简单就不分析了)
3.1.3注册默认处理器
private void registerProcessor() { if (namesrvConfig.isClusterTest()) { //从namesrvConfig中我们知道,该值为false,那么逻辑就是下面的
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
//这个最主要的就是把 处理器和线程池关联起来
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
3.1.4 和 3.1.5 启动了两个定时任务
3.2 向JVM注册Hook函数
//这是一种优雅的停机方式Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
3.3 nameserv启动
这里是nameserver真正的启动流程,前面那么多步骤都是为了这个事情
public void start() throws Exception { //启动remote-server
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
public void start() {
// 事件处理线程池
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
//注册公共的handler
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //心跳检测
connectionManageHandler,
serverHandler
);
}
});
//是否启用内存池
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
//启动broker监听
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//启动超时任务
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
4.总结
nameserver在启动的过程中,可以看到有各种参数的设置和定时job启动,还有各种线程池的初始化等等工作
5.参考资料
http://rocketmq.apache.org/
以上是 Rocketmq中的nameser启动流程(三) 的全部内容, 来源链接: utcz.com/z/514165.html