Netty学习笔记(10)——Netty应用示例(1)
本来想先了解Netty组件,然后再学习组件应用的,然后越学越感觉怪异,总感觉少了啥,组件学起来不知道咋用的,想想还是先从Netty应用开始学算了。
自己的技术学习方法:先学习技术的应用,在应用中逐步抛出问题,比如说这个功能是怎么实现的,带着问题去接触底层原理,然后解决问题。
1. 最基础的Netty应用实现——实现请求与响应
1. 首先是环境配置(jdk)要保证没问题,其次,要引入Netty的jar,使用netty-5.0版本的jar。
2. 在使用Netty开始开发之前,先想一想使用jdk中的NIO原生类库开发服务端时所需要的主要步骤:
- 首先,创建ServerSocketChannel,设置为非阻塞模式。
- 绑定监听端口,设置TCP连接参数。
- 创建一个独立IO线程,用于轮询多路复用器Selector。
- 创建Selector,将创建的ServerSocketChannel注册到该Selector中,并且监听ServerSocketChannel上的SelectionKey.OP_ACCEPT事件。
- 启动独立IO线程,在循环体中执行Selector.select()方法,获取到就绪的Channel。
- 每当获取到Channel时,就需要判断Channel的状态,
- 如果是OP_ACCEPT,那么就说明是新的客户端接入,调用ServerSocketChannel的accept()方法,创建新的连接,即SocketChannel对象。创建SocketChannel对象后,可以设置该TCP连接参数,并且要设置为非阻塞模式,设置完毕后,将该SocketChannel注册到Selector中,并且监听OP_READ事件。
- 如果是OP_READ,那么就说明SocketChannel中有已经就绪的数据可以进行读取,此时就需要构造ByteBuffer对象进行读取。
- 如果是OP_WRITE,那么说明SocketChannel还有数据没有发送,需要继续进行发送。
3. 仅仅是一个简单的数据收发,都已经如此复杂,而Netty真的就简单很多了。以一个服务端时间信息查询为例,客户端向服务端发送一条字符串信息,服务端返回当前时间数据作为响应,代码如下
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName TimeServer
* @Description: Netty实现的时间服务器服务端示例
* @Author
* @Date 2019/11/19 14:41
* @Modified By:
* @Version V1.0
*/
public class TimeServer {
public static void main(String[] args) {
try {
new TimeServer().bind(8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//绑定端口
public void bind(int port) throws InterruptedException {
//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接
EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作
try {
//Netty服务端启动类配置
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可
serverBootstrap.group(bossGroup, workGroup);//配置线程池
//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接
serverBootstrap.channel(NioServerSocketChannel.class);
//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒
serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
//和上面哪行代码一样,配置TCP连接的backlog参数为1024
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel
serverBootstrap.childHandler(new ChildChannelHandler());
//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取
ChannelFuture future1 = serverBootstrap.bind(port);
//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果,绑定完成后服务端就已经开始运行了
future1.sync();
//closeFuture表示Channel连接被关闭时(比如客户端断开连接),会返回一个ChannelFuture对象
ChannelFuture future2 = future1.channel().closeFuture();
//阻塞当前线程,直到future2总可以获取到关闭连接操作结果
future2.sync();
} finally {
//释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器
ch.pipeline().addLast(new TimeServerHandler());
}
}
//针对于SocketChannel的处理器,处理SocketChannel中的IO操作
private class TimeServerHandler extends ChannelHandlerAdapter {
//读取数据,并对数据进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将读取到的数据全部取出
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
//编码解析
String body = new String(req, "utf-8");
System.out.println("server received:" + body);//输出
//响应数据
String res = "server current time:" + System.currentTimeMillis();
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.write(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中
}
//该方法表示读取数据操作完毕,读取完毕后进行的操作
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();//调用该方法才能将缓冲区中所有的数据发送给客户端
}
//数据读取过程中发生异常情况,要进行的后续处理都在该方法中
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.Buffer;
/**
* @ClassName TimeClient
* @Description: 客户端
* @Author
* @Date 2019/11/19 16:39
* @Modified By:
* @Version V1.0
*/
public class TimeClient {
public void connect(int port, String host) {
//创建客户端的NIO线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();//创建客户端辅助启动类
try {
bootstrap.group(workGroup);//绑定NIO线程组
bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeChannelHandler());
}
});//设置Handler
//bootstrap.bind("8081");
ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接
future.sync();//同步阻塞等待连接建立成功
ChannelFuture f = future.channel().closeFuture();//等待连接被关闭断开
f.sync();//同步阻塞等待连接关闭完成
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();//清理资源
}
}
public static void main(String[] args) {
new TimeClient().connect(8080, "localhost");
}
private class TimeChannelHandler extends ChannelHandlerAdapter{
//该方法会在建立连接后执行,表示触发了连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String message = "query server time";
ByteBuf req = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(req);//向服务端发送数据
}
//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer);
String message = new String(buffer, "utf-8");
System.out.println(message);//读取服务端发送过来的数据,并编码输出
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
4. Netty开发主要步骤:可以参考Netty的线程模型,帮助理解
- 服务端:
- 首先创建两个处理线程组,也就是Reactor模型中的主从多线程模型。
- 其次,创建辅助启动类ServerBootstrap,该类的主要作用就是辅助创建并启动Netty服务端的类,通过该类我们可以设置一系列的配置并启动服务端,降低开发复杂度。
- 配置绑定线程组,主线程组用来接收并创建新的连接,工作线程组主要进行IO操作。
- 设置初始创建的channel类型为NioServerSocketChannel,该类对应着NIO中的ServerSocketChannel,用于接收客户端的连接请求,并创建连接(SocketChannel)。
- 设置NioServerSocketChannel的TCP连接参数,比如CONNECT_TIMEOUT_MILLIS连接超时判断等。
- 设置对于与客户端建立连接后创建的NIOSocketChannel对象(所以是childHandler方法,因为是NioServerSocketChannel“创建”的),绑定自定义的Handler处理链,Netty中对于SocketChannel上的数据IO操作采用了类似于Servlet中的Filter机制,使用了责任链设计模式的变形。
- 绑定监听的端口,启动服务端。
- 如果连接被关闭,那么就清理资源,退出
2.客户端:
- 首先,创建一个工作线程组。
- 其次,创建辅助启动类Bootstrap。
- 指定初始建立连接创建的Channel类型为NioSocketChannel
- 调用handler方法绑定NioSocketChannel的处理链,因为是为NioSocketChannel绑定,所以是handler()方法,而不是childHandler()方法。
- 调用connect()方法与服务端建立连接。
- 如果连接被关闭,那么就清理资源,退出
2. 使用Netty解决半包读取(粘包与拆包)的问题
1. TCP中的粘包与拆包问题
TCP在网络模型中,属于传输层协议,即其只负责数据的传输,会将要传输的数据以流的形式在网络间进行传输,就像一条河流一样是连续的,对于TCP来说,并不了解来自应用层的数据含义,再加上TCP的窗口滑动机制,TCP可能会把一条完整的数据流在读取时读到部分数据,也有可能在读取时一次性读取到多段数据流(比较难懂的话可以去简单了解一下NIO中设计的零拷贝实现原理,设计到内核缓冲区和应用缓冲区,以及TCP的部分知识)。
形象一点来说,水龙头就相当于建立的网络连接,在系统内核底层会有一个缓冲区,也就是相当于有一个“缸”,这个缸直接放在水龙头下面的,水龙头打开后(建立连接),水流出来(接收数据)会直接先到缸内暂存(内核缓冲区接收),然后是用户空间,也就是应用程序会用一个“盆”(用户空间的缓冲区)去这个缸里取水(内核缓冲区数据复制到用户缓冲区),然后在代码中对这部分数据进行处理(编解码等)。但是问题在于,数据发送方发送的数据通常都会是一个一个整体的进行发送,对应着数据接收方也要保证接收一个一个完整的整体,这样才能保证正确的数据编解码,但是对于TCP协议来说,由于处于传输层,无法理解这些数据在上层应用层中的含义,无法进行分割处理,只负责数据的传输,在数据接收方,你能保证每次拿“盆”取出的数据是发送方发送的单条完整消息数据吗?
服务端接收读取客户端发送的数据时,有可能会发生以下几种情况:
(1)服务端接收并读取了客户端发送的一条完整消息数据。比如客户端发送了一条“hello”的字符串数据,那么服务端也接受到了一条“hello”的消息数据。
(2)客户端发送了两次消息数据,两条消息数据都是“hello”,但是服务端只接收到了一条消息,消息内容为”hellohello”。(粘包)
(3)客户端发送了两次消息数据,两条消息数据都是“hello”,服务端也接受到了两条消息,但是第一次接受到的消息数据是“helloh”,第二次接收到了“ello”。(拆包)
如果是发送的不包含汉字字符的字符串,那么半包读取还算能够正常进行,但是一旦包含汉字字符,或者是某类文件的字节码数据,发生半包读取后,对数据进行编解码肯定是会发生异常的。
粘包拆包问题的演示代码:正常情况下(或者说我们希望不发生拆包粘包的情况下),客户端发送100条信息,请求服务端时间;服务端接收100条信息,每接收一条就响应一次时间信息,但因为拆包和粘包,结果显然不是这样。
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @ClassName TimeServer
* @Description: TCP粘包拆包引起问题的演示
* @Author
* @Date 2019/11/20 13:59
* @Modified By:
* @Version V1.0
*/
public class TimeServer {
public static void main(String[] args) {
try {
new netty_demo.demo1.TimeServer().bind(8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//绑定端口
public void bind(int port) throws InterruptedException {
//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接
EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作
try {
//Netty服务端启动类配置
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可
serverBootstrap.group(bossGroup, workGroup);//配置线程池
//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接
serverBootstrap.channel(NioServerSocketChannel.class);
//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒
serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
//和上面哪行代码一样,配置TCP连接的backlog参数为1024
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel
serverBootstrap.childHandler(new ChildChannelHandler());
//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取
ChannelFuture future1 = serverBootstrap.bind(port);
//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果
future1.sync();
//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中
ChannelFuture future2 = future1.channel().closeFuture();
//阻塞当前线程,直到future2总可以获取到关闭连接操作结果
future2.sync();
} finally {
//释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器
ch.pipeline().addLast(new TimeServerHandler());
}
}
//针对于SocketChannel的处理器,处理SocketChannel中的IO操作
private class TimeServerHandler extends ChannelHandlerAdapter {
private int count = 0;
//读取数据,并对数据进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将读取到的数据全部取出
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
//编码解析
String body = new String(req, "utf-8");
System.out.println("server received:" + body + ";count="+ ++count);//输出
//响应数据
if ("query server time".equals(body)) {
String res = "server current time:" + System.currentTimeMillis();
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.writeAndFlush(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中
} else {
String res = "bad req:" + System.currentTimeMillis();
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.writeAndFlush(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中
}
}
//数据读取过程中发生异常情况,要进行的后续处理都在该方法中
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
public class TimeClient { public void connect(int port, String host) {
//创建客户端的NIO线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类
try {
bootstrap.group(workGroup);//绑定NIO线程组
bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeChannelHandler());
}
});//设置Handler
//bootstrap.bind("8081");
ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接
future.sync();//同步阻塞等待连接建立成功
ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开
f.sync();//同步阻塞等待连接关闭完成
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();//清理资源
}
}
public static void main(String[] args) {
new TimeClient().connect(8080, "localhost");
}
private class TimeChannelHandler extends ChannelHandlerAdapter {
private int count=0;
//该方法会在建立连接后执行,表示触发了连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=1;i<=100;i++){
String message = "query server time";
ByteBuf req = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(req);//向服务端发送数据
}
}
//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] buffer = new byte[buf.readableBytes()];
buf.readBytes(buffer);
String message = new String(buffer, "utf-8");
System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,并编码输出
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
2. TCP中的粘包与拆包问题的解决方式
粘包与拆包问题的解决方式有以下几种:
(1)消息定长,即消息数据长度固定。
(2)以特定字符作为分隔符或者说结束标志,比如换行符。
(3)将消息分为消息头和消息体,消息头中包含对消息体的描述,比如对于消息体的数据长度等信息。(比如HTTP协议、TCP协议)
(4)定义应用层协议。
1. 指定消息结束标志字符解决文本消息数据的半包读取
对于文本消息数据,也就是字符串数据来说,比较容易解决,可以使用上述的第二种方法,指定一个消息结束标志字符,比如常见的换行符
,Netty中正好提供了以换行符作为消息结束标志的Handler类,通过io.netty.handler.codec.LineBasedFrameDecoder类来解决拆包粘板问题。代码如下所示
public class TimeClient { public void connect(int port, String host) {
//创建客户端的NIO线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类
try {
bootstrap.group(workGroup);//绑定NIO线程组
bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeChannelHandler());
}
});//设置Handler
//bootstrap.bind("8081");
ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接
future.sync();//同步阻塞等待连接建立成功
ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开
f.sync();//同步阻塞等待连接关闭完成
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();//清理资源
}
}
public static void main(String[] args) {
new TimeClient().connect(8080, "localhost");
}
private class TimeChannelHandler extends ChannelHandlerAdapter{
private int count=0;
//该方法会在建立连接后执行,表示触发了连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=1;i<=100;i++){
String message = "query server time"+i+System.getProperty("line.separator");
ByteBuf req = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(req);//向服务端发送数据
}
}
//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
/** * Netty实现的时间服务器服务端粘包拆包问题修正版
*/
public class TimeServer {
public static void main(String[] args) {
try {
new TimeServer().bind(8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//绑定端口
public void bind(int port) throws InterruptedException {
//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接
EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作
try {
//Netty服务端启动类配置
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可
serverBootstrap.group(bossGroup, workGroup);//配置线程池
//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接
serverBootstrap.channel(NioServerSocketChannel.class);
//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒
serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
//和上面哪行代码一样,配置TCP连接的backlog参数为1024
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel
serverBootstrap.childHandler(new ChildChannelHandler());
//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取
ChannelFuture future1 = serverBootstrap.bind(port);
//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果
future1.sync();
//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中
ChannelFuture future2 = future1.channel().closeFuture();
//阻塞当前线程,直到future2总可以获取到关闭连接操作结果
future2.sync();
} finally {
//释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器
ch.pipeline().addLast(new TimeServerHandler());
}
}
//针对于SocketChannel的处理器,处理SocketChannel中的IO操作
private class TimeServerHandler extends ChannelHandlerAdapter {
//读取数据,并对数据进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("server received:" + body);//输出
//响应数据
String res = "server current time:" + System.currentTimeMillis()+System.getProperty("line.separator");
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.writeAndFlush(data);
}
//数据读取过程中发生异常情况,要进行的后续处理都在该方法中
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
该类LineBasedFrameDecoder是已经默认换行符为消息结束标志,无法修改;Netty还提供了另外一个类,可以自由指定结束标志字符,这个类就是DelimiterBasedFrameDecoder,可以通过该类指定字符$为消息结束标志字符。代码如下
public class TimeClient { public void connect(int port, String host) {
//创建客户端的NIO线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类
try {
bootstrap.group(workGroup);//绑定NIO线程组
bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeChannelHandler());
}
});//设置Handler
//bootstrap.bind("8081");
ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接
future.sync();//同步阻塞等待连接建立成功
ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开
f.sync();//同步阻塞等待连接关闭完成
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();//清理资源
}
}
public static void main(String[] args) {
new TimeClient().connect(8080, "localhost");
}
private class TimeChannelHandler extends ChannelHandlerAdapter{
private int count=0;
//该方法会在建立连接后执行,表示触发了连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=1;i<=100;i++){
String message = "query server time"+i+"$";
ByteBuf req = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(req);//向服务端发送数据
}
}
//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
public class TimeServer { public static void main(String[] args) {
try {
new TimeServer().bind(8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//绑定端口
public void bind(int port) throws InterruptedException {
//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接
EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作
try {
//Netty服务端启动类配置
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可
serverBootstrap.group(bossGroup, workGroup);//配置线程池
//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接
serverBootstrap.channel(NioServerSocketChannel.class);
//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒
serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
//和上面哪行代码一样,配置TCP连接的backlog参数为1024
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel
serverBootstrap.childHandler(new ChildChannelHandler());
//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取
ChannelFuture future1 = serverBootstrap.bind(port);
//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果
future1.sync();
//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中
ChannelFuture future2 = future1.channel().closeFuture();
//阻塞当前线程,直到future2总可以获取到关闭连接操作结果
future2.sync();
} finally {
//释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
ch.pipeline().addLast(new StringDecoder());
//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器
ch.pipeline().addLast(new TimeServerHandler());
}
}
//针对于SocketChannel的处理器,处理SocketChannel中的IO操作
private class TimeServerHandler extends ChannelHandlerAdapter {
//读取数据,并对数据进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("server received:" + body);//输出
//响应数据
String res = "server current time:" + System.currentTimeMillis()+"$";
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.writeAndFlush(data);
}
//数据读取过程中发生异常情况,要进行的后续处理都在该方法中
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
2. 消息定长解决消息数据的半包读取问题
实际上,如果你可以确定每次发送的最大消息长度的话,那么可以采用消息定长的方式来解决,当然该方式必须保证消息定长的长度不会很大,可以使用Netty提供的FixedLengthFrameDecoder类实现消息定长。代码如下
public class TimeClient { public void connect(int port, String host) {
//创建客户端的NIO线程组
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类
try {
bootstrap.group(workGroup);//绑定NIO线程组
bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeChannelHandler());
}
});//设置Handler
//bootstrap.bind("8081");
ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接
future.sync();//同步阻塞等待连接建立成功
ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开
f.sync();//同步阻塞等待连接关闭完成
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();//清理资源
}
}
public static void main(String[] args) {
new TimeClient().connect(8080, "localhost");
}
private class TimeChannelHandler extends ChannelHandlerAdapter{
private int count=0;
//该方法会在建立连接后执行,表示触发了连接建立事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=1;i<=100;i++){
String message = "query server time"+i+"$";
ByteBuf req = Unpooled.copiedBuffer(message.getBytes());
ctx.writeAndFlush(req);//向服务端发送数据
}
}
//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
public class TimeServer { public static void main(String[] args) {
try {
new TimeServer().bind(8080);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//绑定端口
public void bind(int port) throws InterruptedException {
//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接
EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作
try {
//Netty服务端启动类配置
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可
serverBootstrap.group(bossGroup, workGroup);//配置线程池
//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接
serverBootstrap.channel(NioServerSocketChannel.class);
//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒
serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
//和上面哪行代码一样,配置TCP连接的backlog参数为1024
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel
serverBootstrap.childHandler(new ChildChannelHandler());
//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取
ChannelFuture future1 = serverBootstrap.bind(port);
//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果
future1.sync();
//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中
ChannelFuture future2 = future1.channel().closeFuture();
//阻塞当前线程,直到future2总可以获取到关闭连接操作结果
future2.sync();
} finally {
//释放线程池资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器
ch.pipeline().addLast(new TimeServerHandler());
}
}
//针对于SocketChannel的处理器,处理SocketChannel中的IO操作
private class TimeServerHandler extends ChannelHandlerAdapter {
//读取数据,并对数据进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("server received:" + body);//输出
//响应数据
String res = "server current time:" + System.currentTimeMillis()+"$";
ByteBuf data = Unpooled.copiedBuffer(res.getBytes());
ctx.writeAndFlush(data);
}
//数据读取过程中发生异常情况,要进行的后续处理都在该方法中
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
3. 定义应用层协议,规定消息结构
上面两种方式基本都适用于比较小的文本消息数据传输,但如果针对于复杂的情况,比如既能传输文本数据,也能传输各种文件,那么就比较复杂了,因为TCP协议属于传输层协议,其不会理解传输的二进制数据代表着什么,所以就必须要定义一个上层应用层的协议,来对TCP接收的数据进行解析。
协议就不细说了,简单来说就是客户端服务端双方的约定,协议规定了客户端发送数据的格式,也规定了服务端解析数据的方式。常见的HTTP协议就是一个很好的例子,其采用了多种方式解决半包读取的问题,首先就是指定了消息的结构(比如请求头、请求行、请求体),并且对于其中的请求头、请求头和请求行肯定也需要一个结束符表示,消息子结构之间的间隔会添加一个换行符来表示结束。可以去详细了解一下Http协议的格式。
Http协议相应的处理Handler肯定不用说,Netty中已经准备好了,只需直接调用就可以实现一个简单的Http协议的文件服务器。代码如下
public class HttpFileServer { private static final String DEFAULT_URL = "/src/netty_demo/";
public static void main(String[] args) {
int port = 8080;
try {
new HttpFileServer().run(port, DEFAULT_URL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//服务器启动方法
public void run(final int port,final String url) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(bossGroup, workGroup);
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder",
new HttpRequestDecoder());//添加HTTP请求消息解码器
ch.pipeline().addLast("http-aggregator",
new HttpObjectAggregator(65536));//该解码器用于将多个Http消息合并为一个完整的HTTP消息,
ch.pipeline().addLast("http-encoder",
new HttpResponseEncoder());//HTTP响应数据编码器
ch.pipeline().addLast("http-chunked",
new ChunkedWriteHandler());//该handler用于解决异步传输大码流(如文件传输),但不会占用太多内存,防止内存溢出
ch.pipeline().addLast("fileServerHandler",
new HttpFileServerHandler(url));
}
});
ChannelFuture cf = bootstrap.bind(port).sync();
System.out.println("HTTP文件服务器启动,地址为:http://localhost:8080"+url);
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
//返回HTTP响应,表示该请求错误
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status){
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
status, Unpooled.copiedBuffer(status.toString()+"
", CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//设置HTTP响应的响应头
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();
response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));
}
//响应一个重定向消息
private static void sendRedirect(ChannelHandlerContext ctx, String uri) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FOUND);
response.headers().set(HttpHeaderNames.LOCATION, uri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//返回文件列表的html页面
private static void sendFileList(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
StringBuilder html = new StringBuilder();
String dirPath = dir.getPath();
html.append("<!DOCTYPE html>
" +
"<html>
" +
"<head>");
html.append("<title>"+dirPath+"目录:"+"</title>");
html.append("</head>");
html.append("<body>
");
html.append("<h3>").append(dirPath).append("目录:").append("</h3>
");
html.append("<ul>");
html.append("<li>链接:<a href="../">..</a></li>
");
for (File f:dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
html.append("<li>链接:<a href=""+name+"">"+name+"</a></li>
");
}
html.append("</ul>");
html.append("</body>
");
html.append("</html>");
ByteBuf buf = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
//HTTP请求入栈后进行的一系列操作,包括进行响应消息
private class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private String url;
public HttpFileServerHandler(String url) {
this.url = url;
}
//解码uri,并转为本地文件目录
private String decodeUri(String uri) {
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("请求路径异常");
}
}
if (!uri.startsWith(DEFAULT_URL)) {
return null;
}
if (!uri.startsWith("/")) {
return null;
}
uri = uri.replace("/", File.separatorChar);
String path = System.getProperty("user.dir")+uri;
return path;
}
//接收到请求消息后进行的处理
@Override
protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
//首先判断请求路径正确性
if(request.decoderResult().isFailure()){
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
if(request.method()!=HttpMethod.GET) {
sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
String uri = request.uri();
String path = decodeUri(uri);
if (path == null) {
sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
//如果请求路径对应的本地文件是一个文件夹(目录)那么就将该目录下的文件变成一个列表展示在HTML页面中,
//并将该页面作为响应消息返回给客户端
if (file.isDirectory()) {
if(uri.endsWith("/")) {
sendFileList(ctx, file);
} else {
sendRedirect(ctx, uri+"/");
}
return;
}
if (!file.isFile()) {
sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
//如果请求路径对应的目标是文件,那么就开启文件传输
RandomAccessFile accessFile = null;
try {
accessFile = new RandomAccessFile(file, "r");//以只读方式打开文件
}catch (FileNotFoundException e) {
sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
long fileLength = accessFile.length();
//创建并设置响应头和响应行
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
setContentTypeHeader(response, file);
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength+"");
String connection = request.headers().get(HttpHeaderNames.CONNECTION).toString();
if(connection.contentEquals(HttpHeaderValues.KEEP_ALIVE)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response);
//创建并设置响应体,即为文件数据
ChannelFuture sendFileFuture;
sendFileFuture = ctx.write(new ChunkedFile(accessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
if (total < 0) {
System.out.println("文件传输中,已发送:"+progress+"byte");
}else {
System.out.println("文件传输中,已发送:"+progress+"/"+total+"byte");
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("文件传输完成");
}
});
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if(connection.contentEquals(HttpHeaderValues.KEEP_ALIVE)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
}
}
如果看完了Http协议服务器代码,你会觉得似乎nginx中对于反向代理的实现代码似乎就是这样的,当然,其中的路径等参数并不是写死了,而是通过nginx的配置文件定义,并且定义一系列的负载均衡规则等,实际代码肯定要复杂的多,但是原理是一样的。
3. 总结
在对Netty进行使用之后,相信10个人里10个人都不会愿意再去使用NIO的原生类库了,Netty相较于NIO原生类库的最大优势就在这一点上,开发复杂度低,而且类库齐全,功能强大。
以上是 Netty学习笔记(10)——Netty应用示例(1) 的全部内容, 来源链接: utcz.com/z/511126.html