Rocketmq中的nameser启动流程(三)

编程


1.程序启动的入口

程序启动的入口,在org.apache.rocketmq.namesrv.NamesrvStartup中

public static void main(String[] args) {

main0(args);

}

public static NamesrvController main0(String[] args) {

try {

NamesrvController controller = createNamesrvController(args); //2.创建NamesrvController

start(controller);//3.启动namesrv

String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

log.info(tip);

System.out.printf("%s%n", tip);

return controller;

} catch (Throwable e) {

e.printStackTrace();

System.exit(-1);

}

return null;

}

namesrv的启动主流程很清晰就两件事情,一个是创建流程,一个是启动流程,但是每个流程里面,又有很多的点


2.创建流程

我删除一些零散的逻辑,下面是主要的逻辑

 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

// 2.1 创建nameserver的配置文件

final NamesrvConfig namesrvConfig = new NamesrvConfig();

// 2.2 创建对应的服务端配置文件

final NettyServerConfig nettyServerConfig = new NettyServerConfig();

// 设置nameserver的监听端口号

nettyServerConfig.setListenPort(9876);

// 2.3 创建NameServer

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

return controller;

}

2.1 创建nameserver的配置文件(NamesrvConfig)

	//  rocketmq的路径,这个路径下面可以放置一下配置文件,例如logback.xml文件

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));

// kvConfigPath

private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";

// configStorePath

private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";

private String productEnvName = "center";

private boolean clusterTest = false;

//是否顺序消息

private boolean orderMessageEnable = false;

2.2 创建对应的服务端配置文件(NettyServerConfig)

	//端口号,nameserver的端口号是:9876

private int listenPort = 8888;

//工作线程数

private int serverWorkerThreads = 8;

// 目前在nameserver端没有用处

private int serverCallbackExecutorThreads = 0;

//selector线程数

private int serverSelectorThreads = 3;

// 单向发送消息的信号量个数

private int serverOnewaySemaphoreValue = 256;

//同步发送消息的信号量个数

private int serverAsyncSemaphoreValue = 64;

//心跳间隔时间

private int serverChannelMaxIdleTimeSeconds = 120;

// 发送缓冲区大小

private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;

// 接受缓冲区大小

private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;

// 是否使用内存池

private boolean serverPooledByteBufAllocatorEnable = true;

//是否启动别的IO模型

private boolean useEpollNativeSelector = false;

2.3 创建NameServer

 public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {

//nameserver的配置

this.namesrvConfig = namesrvConfig;

//有关netty服务端的配置

this.nettyServerConfig = nettyServerConfig;

this.kvConfigManager = new KVConfigManager(this);

//路由信息的配置,这个比较重要

this.routeInfoManager = new RouteInfoManager();

//监听broker下线信息,并通过RouteInfoManager中的方法移除内存中的broker信息

this.brokerHousekeepingService = new BrokerHousekeepingService(this);

this.configuration = new Configuration(

log,

this.namesrvConfig, this.nettyServerConfig

);

//配置文件存储逻辑

this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");

}

//我们来看下一下 RouteInfoManager 里面都什么什么

public RouteInfoManager() {

this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); //topicQueueTable 存放该topic下面对应的队列信息

this.brokerAddrTable = new HashMap<String, BrokerData>(128); //brokerAddrTable 存放broker对应的信息

this.clusterAddrTable = new HashMap<String, Set<String>>(32); // clusterAddrTable 存放集群对应的信息

this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); //brokerLiveTable 存放活跃的broker信息

this.filterServerTable = new HashMap<String, List<String>>(256); //filterServerTable 存放需要过滤的信息

}

2.4 创建流程小结

主要是一些参数的设置和一些路由信息的变量初始化话工作


3.启动nameserver

先看一下启动的整体流程

public static NamesrvController start(final NamesrvController controller) throws Exception {

//3.1 初始化namesrv

boolean initResult = controller.initialize();

//3.2 向JVM中注册Hook函数 JVM关闭之前,执行该Hook函数

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {

@Override

public Void call() throws Exception {

controller.shutdown();

return null;

}

}));

//3.3 nameserv启动

controller.start();

return controller;

}

主要分为了三个部分,初始化,向JVM中注册Hook函数,然后启动nameserver


3.1 初始化nameserver

public boolean initialize() {

//加载配置

this.kvConfigManager.load();

//3.1.1 初始化netty服务端

this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

//3.1.2初始化工作线程池,8个线程

this.remotingExecutor =

Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

//3.1.3注册默认处理器

this.registerProcessor();

//3.1.4每隔10分钟扫描一下不在存活的broker并剔除不存活的机器

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

NamesrvController.this.routeInfoManager.scanNotActiveBroker();

}

}, 5, 10, TimeUnit.SECONDS);

//3.1.5每隔十分钟打印配置信息

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

NamesrvController.this.kvConfigManager.printAllPeriodically();

}

}, 1, 10, TimeUnit.MINUTES);

return true;

}

主要是初始化netty服务端数据,初始化工作线程池,注册一些默认的处理器,和启动两个定时任务,下面看下这几个流程都干了什么

3.1.1 初始化netty服务端

 public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {

//设置单向发送的信号量个数和同步发送的信号量个数

super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());

this.serverBootstrap = new ServerBootstrap();

this.nettyServerConfig = nettyServerConfig;

//设置brokerHousekeepingService

this.channelEventListener = channelEventListener;

//获取线程数

int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();

if (publicThreadNums <= 0) {

publicThreadNums = 4;

}

//公共服务线程池(大概含义就是如果一个任务找不到对应的线程池处理,那么就放到这个地方)

this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());

}

});

//根据操作系统选择,使用Epoll方式还是使用NIO方式

if (useEpoll()) {

//监听TCP连接

this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));

}

});

//socket 注册到 selector ,3个selector

this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));

}

});

} else {

this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));

}

});

this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override

public Thread newThread(Runnable r) {

return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));

}

});

}

}

3.1.2初始化工作线程池 (这个简单就不分析了)

3.1.3注册默认处理器

private void registerProcessor() {

if (namesrvConfig.isClusterTest()) { //从namesrvConfig中我们知道,该值为false,那么逻辑就是下面的

this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),

this.remotingExecutor);

} else {

//这个最主要的就是把 处理器和线程池关联起来

this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

}

}

3.1.4 和 3.1.5 启动了两个定时任务


3.2 向JVM注册Hook函数

//这是一种优雅的停机方式

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {

@Override

public Void call() throws Exception {

controller.shutdown();

return null;

}

}));


3.3 nameserv启动

这里是nameserver真正的启动流程,前面那么多步骤都是为了这个事情

  public void start() throws Exception {

//启动remote-server

this.remotingServer.start();

if (this.fileWatchService != null) {

this.fileWatchService.start();

}

}

public void start() {

// 事件处理线程池

this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(

nettyServerConfig.getServerWorkerThreads(),

new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());

}

});

//注册公共的handler

prepareSharableHandlers();

ServerBootstrap childHandler =

this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 1024)

.option(ChannelOption.SO_REUSEADDR, true)

.option(ChannelOption.SO_KEEPALIVE, false)

.childOption(ChannelOption.TCP_NODELAY, true)

.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)

.addLast(defaultEventExecutorGroup,

encoder,

new NettyDecoder(),

new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //心跳检测

connectionManageHandler,

serverHandler

);

}

});

//是否启用内存池

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {

childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

}

try {

ChannelFuture sync = this.serverBootstrap.bind().sync();

InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();

this.port = addr.getPort();

} catch (InterruptedException e1) {

throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);

}

//启动broker监听

if (this.channelEventListener != null) {

this.nettyEventExecutor.start();

}

//启动超时任务

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override

public void run() {

try {

NettyRemotingServer.this.scanResponseTable();

} catch (Throwable e) {

log.error("scanResponseTable exception", e);

}

}

}, 1000 * 3, 1000);

}


4.总结

nameserver在启动的过程中,可以看到有各种参数的设置和定时job启动,还有各种线程池的初始化等等工作

5.参考资料

http://rocketmq.apache.org/

以上是 Rocketmq中的nameser启动流程(三) 的全部内容, 来源链接: utcz.com/z/514165.html

回到顶部