Java学习笔记14Netty线程模型及源码解析
Java学习笔记14-Netty线程模型及源码解析
Netty介绍
Netty是一个高性能。高可扩展性的异步事件驱动的网络应用程序框架,它极大地简化了TCP和UDP客户端和服务器开发等网络编程。Netty重要的四个内容:
- Reactor线程模型:一种高性能的多线程程序设计思路
- Netty中自己定义的Channel概念:增强版的通道概念
- ChannelPipeline职责链设计模式:事件处理机制
- 内存管理:增强的ByteBuf缓冲区
Netty线程模型
为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型。
Reactor模型中有四个概念:
- Resources资源(请求/任务)
- Synchronous Event Demultiplexer 同步事件复用器
- Dispatcher 分配器
- Request Handler 请求处理器
Clients Reactor线程(Boss) Worker线程池
请求 -->
事件复用器 --> Dispatcher -->
处理器
Event Loop(Selector事件轮询)--> Request Handler长耗时任务或者I/O处理read write.. -->
EventLoopGroup初始化过程
EventLoopGroup构建一组EventLoop(线程),一般采用多路复用Reactor线程模型使用两组EventLoopGroup处理不同通道不同的事件,一组Main主要处理服务端accept事件,一组Sub主要处理客户端I/O事件
EventLoopGroup(Main) EventLoopGroup(Sub)
EventLoop(executor线程执行器,EventLoop对应一个线程)↓
↓ Run(轮询通道事件分配执行task任务)↓
处理请求
TaskQueue(任务队列)| Selector(事件选择器)
其他EventLoop… --> 分配给 Sub
I/O处理
主要处理服务端accept
主要处理客户端I/O
执行过程 Netty4.1.44.Final源码位置
new NioEventLoopGroup():构造函数 ↓
NioEventLoopGroup:43行
确定线程数量:默认cpus*2 ↓
MultithreadEventLoopGroup:52行,40行
new Executor:构建线程执行器 ↓
MultithreadEventExecutorGroup:76行
for->newChild():构建EventLoop ↓
MultithreadEventExecutorGroup:84行
new EventExecutorChooser
MultithreadEventExecutorGroup:111行
EventLoop的启动
EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor创建新线程来触发run方法执行。
执行过程 Netty4.1.44.Final源码位置
executor:请求执行任务↓
SingleThreadEventExecutor:execute():828行
addTask:添加到任务队列↓
SingleThreadEventExecutor:addTask():840行
判断是否是EventLoop自身调用↓
SingleThreadEventExecutor:inEventLoop():841行
startThread -> doStartThread↓
SingleThreadEventExecutor:startThread():842行
使用executor创建新线程执行run
SingleThreadEventExecutor:doStartThread():990行;SingleThreadEventExecutor.this.run():1001行
Bind绑定端口过程
执行过程 Netty4.1.44.Final源码位置
bind(端口):ServerBootstrap.bind(20480)↓
AbstractBootstrap:bind():233行
创建和初始化Channel↓
AbstractBootstrap:initAndRegister():260行;AbstractBootstrap:newChannel():298行;ServerBootstrap:init():125行
注册到EventLoop中的Selector上(提交任务到EventLoop执行。注册完成后,在继续绑定)↓
AbstractBootstrap:config().group().register(channel):311行;AbstractChannel:register():452行;doRegister():497行;AbstractNioChannel:doRegister():380行
doBind0() -> channel.bind↓
AbstractBootstrap:doBind0():344行
pipeline.bind↓
AbstractChannel:pipeline.bind:247行;AbstractChannelHandlerContext:bind:479行;AbstractChannelHandlerContext:invokeBind:506行
HandlerContext.bind↓
DefaultChannelPipeline:unsafe.bind:1346行
AbstractUnsafe.bind↓
AbstractChannel:doBind:551行
NioServerSocketChannel.doBind
NioServerSocketChannel:doBind:132行
Channel概念
Netty中的Channel是一个抽象的概念,可以理解为对 JDK NIO Channel的增强和拓展。
增加了很多属性和方法,完整信息可以看代码注释,下面罗列几个常见的属性和方法:
AbstractChannel
- pipeline DefaultChannelPipeline // 通道内事件处理链路
- eventLoop EventLoop // 绑定的EventLoop,用户执行操作
- unsafe Unsafe // 提供 I/O相关操作的封装
…
+ config() ChannelConfig // 返回通道配置信息
+ read() Channel // 开始读数据,触发读取链路调用
+ write(Object msg) ChannelFuture // 写数据,触发链路调用
+ bind(SocketAddresss socketAddresss) ChannelFuture // 绑定
…
Netty线程模型测试代码
import java.io.IOException;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: Wenx
* @Description:
* @Date: Created in 2019/11/25 13:46
* @Modified By:
*/
public class MyNettyServer {
public static void main(String[] args) {
// 创建EventLoopGroup accept线程组
MyEventLoopGroup mainGroup = new MyEventLoopGroup(1);
// 创建EventLoopGroup I/O线程组
MyEventLoopGroup subGroup = new MyEventLoopGroup(2);
try {
// 服务端启动引导工具类
MyServerBootstrap b = new MyServerBootstrap();
// 配置服务端处理的reactor线程组以及服务端的其他配置
b.group(mainGroup, subGroup).channel(ServerSocketChannel.class)
.option("SO_BACKLOG", 100)
.handler("new LoggingHandler()")
.childHandler("new EchoServerHandler()");
// 通过bind启动服务
FutureTask f = b.bind(20480);
// 阻塞主线程,直到网络服务被关闭
f.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程组
mainGroup.shutdown();
subGroup.shutdown();
}
}
}
class MyServerBootstrap {
/**
* main线程组
*/
private volatile MyEventLoopGroup group;
/**
* sub线程组
*/
private volatile MyEventLoopGroup childGroup;
/**
* 绑定线程组,单reactor模式
*
* @param group main/sub线程组
* @return
*/
public MyServerBootstrap group(MyEventLoopGroup group) {
return group(group, group);
}
/**
* 绑定线程组,多路reactor模式
*
* @param parentGroup main线程组
* @param childGroup sub线程组
* @return
*/
public MyServerBootstrap group(MyEventLoopGroup parentGroup, MyEventLoopGroup childGroup) {
if (parentGroup == null) {
throw new NullPointerException("group");
}
if (group != null) {
throw new IllegalStateException("group set already");
}
group = parentGroup;
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
/**
* 配置channel工厂
*
* @param channelClass channel工厂类
* @return
*/
public MyServerBootstrap channel(Class<?> channelClass) {
// 伪代码,这里为channelFactory赋值,方便后续初始化channel使用
// 因为我们是NIO TCP,后续就直接使用ServerSocketChannel,SocketChannel
return this;
}
/**
* 配置参数
*
* @param option 参数KEY
* @param value 参数值
* @return
*/
public MyServerBootstrap option(Object option, Object value) {
// 伪代码,这里为往options添加option,方便后续配置使用
return this;
}
/**
* main线程组处理器
*
* @param handler 处理器
* @return
*/
public MyServerBootstrap handler(Object handler) {
// 伪代码,这里将handler添加进groupHandler,给后续pipeline使用
// 我们后面就模拟日志Handler,直接打印accept时客户端信息
return this;
}
/**
* sub线程组处理器
*
* @param childHandler 处理器
* @return
*/
public MyServerBootstrap childHandler(Object childHandler) {
// 伪代码,这里将handler添加进childHandler,给后续pipeline使用
// 我们后面就模拟EchoHandler,直接打印客户端发送的数据及将数据处理返回
return this;
}
/**
* 绑定端口开启服务
*
* @param inetPort
* @return
*/
public FutureTask bind(int inetPort) {
InetSocketAddress localAddress = new InetSocketAddress(inetPort);
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
try {
// 1. 初始化和注册
ServerSocketChannel channel = initAndRegister();
// 2. 绑定端口开始服务
channel.bind(localAddress);
} catch (Exception e) {
e.printStackTrace();
}
FutureTask futureTask = new FutureTask(() -> null);
return futureTask;
}
/**
* 初始化及注册
*
* @return Channel
* @throws Exception
*/
final ServerSocketChannel initAndRegister() throws Exception {
// 1. 初始化channel,通过channelFactory来newChannel,这里直接创建ServerSocketChannel
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
// 2. 初始化pipeline责任链
// 伪代码……
// group载入handler,处理accept,模拟日志Handler打印客户端信息
// childGroup载入childHandler,处理I/O,模拟EchoHandler打印客户端发送的数据及将数据处理返回
// 3. 将serverSocketChannel注册绑定到selector
// Netty把Channel封装增强带许多参数,我们直接传this就不封装了,方便理解
SelectionKey selectionKey = group.register(channel, this);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
return channel;
}
/**
* 本应把Channel封装增强带许多参数,我们就不封装了,方便理解
*
* @param key
* @throws Exception
*/
public void channelHandler(SelectionKey key) throws Exception {
SelectableChannel channel = key.channel();
if (channel instanceof ServerSocketChannel) {
acceptHandler((ServerSocketChannel) key.channel());
} else if (channel instanceof SocketChannel) {
ioHandler((SocketChannel) key.channel());
}
}
/**
* mainReactor 处理accept
*
* @param server 服务端Channel
* @throws Exception
*/
public void acceptHandler(ServerSocketChannel server) throws Exception {
SocketChannel client = server.accept();
client.configureBlocking(false);
// 收到接收到客户端连接后,将SocketChannel的read事件注册绑定到selector选择器上,分发给I/O线程继续去读取数据
SelectionKey selectionKey = childGroup.register(client, this);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "接收连接 : " + client.getRemoteAddress());
}
/**
* subReactor 处理I/O
*
* @param client 客户端Channel
* @throws Exception
*/
public void ioHandler(SocketChannel client) throws Exception {
ByteBuffer readBuff = ByteBuffer.allocate(1024);
while (client.isOpen() && client.read(readBuff) != -1) {
// 长连接情况下,需要手动判断数据有没有读取结束
// 此处做一个简单的判断:超过0字节就认为请求结束了
if (readBuff.position() > 0) {
break;
}
}
// 如果没数据了, 则不继续后面的处理
if (readBuff.position() == 0) {
return;
}
readBuff.flip();
byte[] content = new byte[readBuff.limit()];
readBuff.get(content);
String response = "已收到:" + new String(content);
System.out.println(response);
System.out.println("来自:" + client.getRemoteAddress());
// TODO 业务操作 数据库 接口调用等等
ByteBuffer writeBuff = ByteBuffer.wrap(response.getBytes());
while (writeBuff.hasRemaining()) {
client.write(writeBuff);
}
}
}
class MyEventLoopGroup {
/**
* 默认EventLoop线程数量:cpus*2
*/
private static final int DEFAULT_EVENT_LOOP_THREADS = Runtime.getRuntime().availableProcessors() * 2;
/**
* EventLoop集合
*/
private final MyEventLoop[] children;
/**
* Chooser索引
*/
private final AtomicInteger idx = new AtomicInteger();
/**
* 构造函数
*/
public MyEventLoopGroup() {
this(0);
}
/**
* 构造函数
*
* @param nThreads EventLoop线程数量
*/
public MyEventLoopGroup(int nThreads) {
if (nThreads <= 0) {
nThreads = DEFAULT_EVENT_LOOP_THREADS;
}
children = new MyEventLoop[nThreads];
for (int i = 0; i < nThreads; i++) {
try {
children[i] = newChild();
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
}
}
}
/**
* 创建EventLoop线程
*
* @return EventLoop
* @throws Exception
*/
public MyEventLoop newChild() throws Exception {
return new MyEventLoop();
}
/**
* 选择一个EventLoop线程
*
* @return EventLoop
*/
public MyEventLoop next() {
// Chooser
int index = Math.abs(idx.getAndIncrement() % children.length);
return children[index];
}
/**
* 将Channel注册绑定到EventLoop的Selector上
*
* @param channel
* @return
* @throws Exception
*/
public SelectionKey register(SelectableChannel channel, MyServerBootstrap b) throws Exception {
return next().register(channel, b);
}
/**
* 关闭EventLoopGroup
*/
public void shutdown() {
for (MyEventLoop el : children) {
el.shutdown();
}
}
}
class MyEventLoop implements Executor {
/**
* 选择器
*/
private Selector selector;
/**
* 任务队列
*/
private final Queue<Runnable> taskQueue;
/**
* 任务线程
*/
private volatile Thread thread;
/**
* 线程运行状态
*/
private volatile boolean running = false;
/**
* 构造函数
*
* @throws IOException
*/
public MyEventLoop() throws IOException {
// 初始化选择器及任务队列
selector = Selector.open();
taskQueue = new LinkedBlockingQueue<>();
}
/**
* 将任务加入任务队列等待线程执行
*
* @param task 待执行的任务
*/
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
taskQueue.offer(task);
if (!running || thread == null) {
startThread();
}
}
/**
* 开启任务线程
*/
private void startThread() {
if (!running) {
running = true;
if (thread == null) {
thread = new Thread(new Runnable() {
@Override
public void run() {
while (running) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
}
}
});
}
thread.start();
System.out.println("启动:" + thread.getName());
System.out.println("来自:" + Thread.currentThread().getName());
}
}
/**
* 处理SelectedKeys
*/
private void processSelectedKeys() {
try {
selector.select(1000);
// 获取查询结果
Set<SelectionKey> selected = selector.selectedKeys();
// 遍历查询结果
Iterator<SelectionKey> iterator = selected.iterator();
while (iterator.hasNext()) {
// 被封装的查询结果
SelectionKey key = iterator.next();
iterator.remove();
int readyOps = key.readyOps();
// 关注 Read 和 Accept两个事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
((MyServerBootstrap) key.attachment()).channelHandler(key);
if (!key.channel().isOpen()) {
key.cancel(); // 如果关闭了,就取消这个KEY的订阅
}
} catch (Exception ex) {
key.cancel(); // 如果有异常,就取消这个KEY的订阅
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 执行全部任务
*/
private void runAllTasks() {
// 执行队列中的任务
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
}
public SelectionKey register(SelectableChannel channel, MyServerBootstrap b) throws Exception {
// register以任务形式提交,确保selector与正在selector.select()线程不会发生争抢
FutureTask<SelectionKey> f = new FutureTask<>(() -> channel.register(selector, 0, b));
execute(f);
System.out.println("register:" + channel.getClass().getName());
return f.get();
}
/**
* 关闭EventLoop
*/
public void shutdown() {
try {
running = false;
if (thread != null) {
thread.interrupt();
}
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上是 Java学习笔记14Netty线程模型及源码解析 的全部内容, 来源链接: utcz.com/z/511194.html