Netty学习笔记(10)——Netty应用示例(1)

编程

本来想先了解Netty组件,然后再学习组件应用的,然后越学越感觉怪异,总感觉少了啥,组件学起来不知道咋用的,想想还是先从Netty应用开始学算了。

自己的技术学习方法:先学习技术的应用,在应用中逐步抛出问题,比如说这个功能是怎么实现的,带着问题去接触底层原理,然后解决问题。

1. 最基础的Netty应用实现——实现请求与响应

1. 首先是环境配置(jdk)要保证没问题,其次,要引入Netty的jar,使用netty-5.0版本的jar。

2. 在使用Netty开始开发之前,先想一想使用jdk中的NIO原生类库开发服务端时所需要的主要步骤:

  1. 首先,创建ServerSocketChannel,设置为非阻塞模式。
  2. 绑定监听端口,设置TCP连接参数。
  3. 创建一个独立IO线程,用于轮询多路复用器Selector。
  4. 创建Selector,将创建的ServerSocketChannel注册到该Selector中,并且监听ServerSocketChannel上的SelectionKey.OP_ACCEPT事件。
  5. 启动独立IO线程,在循环体中执行Selector.select()方法,获取到就绪的Channel。
  6. 每当获取到Channel时,就需要判断Channel的状态,

    1. 如果是OP_ACCEPT,那么就说明是新的客户端接入,调用ServerSocketChannel的accept()方法,创建新的连接,即SocketChannel对象。创建SocketChannel对象后,可以设置该TCP连接参数,并且要设置为非阻塞模式,设置完毕后,将该SocketChannel注册到Selector中,并且监听OP_READ事件。
    2. 如果是OP_READ,那么就说明SocketChannel中有已经就绪的数据可以进行读取,此时就需要构造ByteBuffer对象进行读取。
    3. 如果是OP_WRITE,那么说明SocketChannel还有数据没有发送,需要继续进行发送。

3. 仅仅是一个简单的数据收发,都已经如此复杂,而Netty真的就简单很多了。以一个服务端时间信息查询为例,客户端向服务端发送一条字符串信息,服务端返回当前时间数据作为响应,代码如下

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* @ClassName TimeServer

* @Description: Netty实现的时间服务器服务端示例

* @Author

* @Date 2019/11/19 14:41

* @Modified By:

* @Version V1.0

*/

public class TimeServer {

public static void main(String[] args) {

try {

new TimeServer().bind(8080);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//绑定端口

public void bind(int port) throws InterruptedException {

//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接

EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作

try {

//Netty服务端启动类配置

ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可

serverBootstrap.group(bossGroup, workGroup);//配置线程池

//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接

serverBootstrap.channel(NioServerSocketChannel.class);

//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒

serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

//和上面哪行代码一样,配置TCP连接的backlog参数为1024

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel

serverBootstrap.childHandler(new ChildChannelHandler());

//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取

ChannelFuture future1 = serverBootstrap.bind(port);

//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果,绑定完成后服务端就已经开始运行了

future1.sync();

//closeFuture表示Channel连接被关闭时(比如客户端断开连接),会返回一个ChannelFuture对象

ChannelFuture future2 = future1.channel().closeFuture();

//阻塞当前线程,直到future2总可以获取到关闭连接操作结果

future2.sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器

ch.pipeline().addLast(new TimeServerHandler());

}

}

//针对于SocketChannel的处理器,处理SocketChannel中的IO操作

private class TimeServerHandler extends ChannelHandlerAdapter {

//读取数据,并对数据进行处理

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//将读取到的数据全部取出

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

//编码解析

String body = new String(req, "utf-8");

System.out.println("server received:" + body);//输出

//响应数据

String res = "server current time:" + System.currentTimeMillis();

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.write(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中

}

//该方法表示读取数据操作完毕,读取完毕后进行的操作

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();//调用该方法才能将缓冲区中所有的数据发送给客户端

}

//数据读取过程中发生异常情况,要进行的后续处理都在该方法中

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

import io.netty.bootstrap.Bootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.Buffer;

/**

* @ClassName TimeClient

* @Description: 客户端

* @Author

* @Date 2019/11/19 16:39

* @Modified By:

* @Version V1.0

*/

public class TimeClient {

public void connect(int port, String host) {

//创建客户端的NIO线程组

EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();//创建客户端辅助启动类

try {

bootstrap.group(workGroup);//绑定NIO线程组

bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new TimeChannelHandler());

}

});//设置Handler

//bootstrap.bind("8081");

ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接

future.sync();//同步阻塞等待连接建立成功

ChannelFuture f = future.channel().closeFuture();//等待连接被关闭断开

f.sync();//同步阻塞等待连接关闭完成

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

workGroup.shutdownGracefully();//清理资源

}

}

public static void main(String[] args) {

new TimeClient().connect(8080, "localhost");

}

private class TimeChannelHandler extends ChannelHandlerAdapter{

//该方法会在建立连接后执行,表示触发了连接建立事件

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

String message = "query server time";

ByteBuf req = Unpooled.copiedBuffer(message.getBytes());

ctx.writeAndFlush(req);//向服务端发送数据

}

//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

byte[] buffer = new byte[buf.readableBytes()];

buf.readBytes(buffer);

String message = new String(buffer, "utf-8");

System.out.println(message);//读取服务端发送过来的数据,并编码输出

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

4. Netty开发主要步骤:可以参考Netty的线程模型,帮助理解

  1. 服务端:

  • 首先创建两个处理线程组,也就是Reactor模型中的主从多线程模型。
  • 其次,创建辅助启动类ServerBootstrap,该类的主要作用就是辅助创建并启动Netty服务端的类,通过该类我们可以设置一系列的配置并启动服务端,降低开发复杂度。
  • 配置绑定线程组,主线程组用来接收并创建新的连接,工作线程组主要进行IO操作。
  • 设置初始创建的channel类型为NioServerSocketChannel,该类对应着NIO中的ServerSocketChannel,用于接收客户端的连接请求,并创建连接(SocketChannel)。
  • 设置NioServerSocketChannel的TCP连接参数,比如CONNECT_TIMEOUT_MILLIS连接超时判断等。
  • 设置对于与客户端建立连接后创建的NIOSocketChannel对象(所以是childHandler方法,因为是NioServerSocketChannel“创建”的),绑定自定义的Handler处理链,Netty中对于SocketChannel上的数据IO操作采用了类似于Servlet中的Filter机制,使用了责任链设计模式的变形。
  • 绑定监听的端口,启动服务端。
  • 如果连接被关闭,那么就清理资源,退出

    2.客户端:

  • 首先,创建一个工作线程组。
  • 其次,创建辅助启动类Bootstrap。
  • 指定初始建立连接创建的Channel类型为NioSocketChannel
  • 调用handler方法绑定NioSocketChannel的处理链,因为是为NioSocketChannel绑定,所以是handler()方法,而不是childHandler()方法。
  • 调用connect()方法与服务端建立连接。
  • 如果连接被关闭,那么就清理资源,退出

2. 使用Netty解决半包读取(粘包与拆包)的问题

1. TCP中的粘包与拆包问题

    TCP在网络模型中,属于传输层协议,即其只负责数据的传输,会将要传输的数据以流的形式在网络间进行传输,就像一条河流一样是连续的,对于TCP来说,并不了解来自应用层的数据含义,再加上TCP的窗口滑动机制,TCP可能会把一条完整的数据流在读取时读到部分数据,也有可能在读取时一次性读取到多段数据流(比较难懂的话可以去简单了解一下NIO中设计的零拷贝实现原理,设计到内核缓冲区和应用缓冲区,以及TCP的部分知识)。

    形象一点来说,水龙头就相当于建立的网络连接,在系统内核底层会有一个缓冲区,也就是相当于有一个“缸”,这个缸直接放在水龙头下面的,水龙头打开后(建立连接),水流出来(接收数据)会直接先到缸内暂存(内核缓冲区接收),然后是用户空间,也就是应用程序会用一个“盆”(用户空间的缓冲区)去这个缸里取水(内核缓冲区数据复制到用户缓冲区),然后在代码中对这部分数据进行处理(编解码等)。但是问题在于,数据发送方发送的数据通常都会是一个一个整体的进行发送,对应着数据接收方也要保证接收一个一个完整的整体,这样才能保证正确的数据编解码,但是对于TCP协议来说,由于处于传输层,无法理解这些数据在上层应用层中的含义,无法进行分割处理,只负责数据的传输,在数据接收方,你能保证每次拿“盆”取出的数据是发送方发送的单条完整消息数据吗?

    服务端接收读取客户端发送的数据时,有可能会发生以下几种情况:

(1)服务端接收并读取了客户端发送的一条完整消息数据。比如客户端发送了一条“hello”的字符串数据,那么服务端也接受到了一条“hello”的消息数据。

(2)客户端发送了两次消息数据,两条消息数据都是“hello”,但是服务端只接收到了一条消息,消息内容为”hellohello”。(粘包)

(3)客户端发送了两次消息数据,两条消息数据都是“hello”,服务端也接受到了两条消息,但是第一次接受到的消息数据是“helloh”,第二次接收到了“ello”。(拆包)

    如果是发送的不包含汉字字符的字符串,那么半包读取还算能够正常进行,但是一旦包含汉字字符,或者是某类文件的字节码数据,发生半包读取后,对数据进行编解码肯定是会发生异常的。

    粘包拆包问题的演示代码:正常情况下(或者说我们希望不发生拆包粘包的情况下),客户端发送100条信息,请求服务端时间;服务端接收100条信息,每接收一条就响应一次时间信息,但因为拆包和粘包,结果显然不是这样。

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* @ClassName TimeServer

* @Description: TCP粘包拆包引起问题的演示

* @Author

* @Date 2019/11/20 13:59

* @Modified By:

* @Version V1.0

*/

public class TimeServer {

public static void main(String[] args) {

try {

new netty_demo.demo1.TimeServer().bind(8080);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//绑定端口

public void bind(int port) throws InterruptedException {

//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接

EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作

try {

//Netty服务端启动类配置

ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可

serverBootstrap.group(bossGroup, workGroup);//配置线程池

//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接

serverBootstrap.channel(NioServerSocketChannel.class);

//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒

serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

//和上面哪行代码一样,配置TCP连接的backlog参数为1024

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel

serverBootstrap.childHandler(new ChildChannelHandler());

//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取

ChannelFuture future1 = serverBootstrap.bind(port);

//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果

future1.sync();

//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中

ChannelFuture future2 = future1.channel().closeFuture();

//阻塞当前线程,直到future2总可以获取到关闭连接操作结果

future2.sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器

ch.pipeline().addLast(new TimeServerHandler());

}

}

//针对于SocketChannel的处理器,处理SocketChannel中的IO操作

private class TimeServerHandler extends ChannelHandlerAdapter {

private int count = 0;

//读取数据,并对数据进行处理

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//将读取到的数据全部取出

ByteBuf buf = (ByteBuf) msg;

byte[] req = new byte[buf.readableBytes()];

buf.readBytes(req);

//编码解析

String body = new String(req, "utf-8");

System.out.println("server received:" + body + ";count="+ ++count);//输出

//响应数据

if ("query server time".equals(body)) {

String res = "server current time:" + System.currentTimeMillis();

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.writeAndFlush(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中

} else {

String res = "bad req:" + System.currentTimeMillis();

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.writeAndFlush(data);//注意,这里的写操作不是立即发送的,而是会先保存在一个缓冲区中

}

}

//数据读取过程中发生异常情况,要进行的后续处理都在该方法中

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

public class TimeClient {

public void connect(int port, String host) {

//创建客户端的NIO线程组

EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类

try {

bootstrap.group(workGroup);//绑定NIO线程组

bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new TimeChannelHandler());

}

});//设置Handler

//bootstrap.bind("8081");

ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接

future.sync();//同步阻塞等待连接建立成功

ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开

f.sync();//同步阻塞等待连接关闭完成

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

workGroup.shutdownGracefully();//清理资源

}

}

public static void main(String[] args) {

new TimeClient().connect(8080, "localhost");

}

private class TimeChannelHandler extends ChannelHandlerAdapter {

private int count=0;

//该方法会在建立连接后执行,表示触发了连接建立事件

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

for(int i=1;i<=100;i++){

String message = "query server time";

ByteBuf req = Unpooled.copiedBuffer(message.getBytes());

ctx.writeAndFlush(req);//向服务端发送数据

}

}

//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

byte[] buffer = new byte[buf.readableBytes()];

buf.readBytes(buffer);

String message = new String(buffer, "utf-8");

System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,并编码输出

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

2. TCP中的粘包与拆包问题的解决方式

粘包与拆包问题的解决方式有以下几种:

(1)消息定长,即消息数据长度固定。

(2)以特定字符作为分隔符或者说结束标志,比如换行符。

(3)将消息分为消息头和消息体,消息头中包含对消息体的描述,比如对于消息体的数据长度等信息。(比如HTTP协议、TCP协议)

(4)定义应用层协议。

    1. 指定消息结束标志字符解决文本消息数据的半包读取

    对于文本消息数据,也就是字符串数据来说,比较容易解决,可以使用上述的第二种方法,指定一个消息结束标志字符,比如常见的换行符

,Netty中正好提供了以换行符作为消息结束标志的Handler类,通过io.netty.handler.codec.LineBasedFrameDecoder类来解决拆包粘板问题。代码如下所示

public class TimeClient {

public void connect(int port, String host) {

//创建客户端的NIO线程组

EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类

try {

bootstrap.group(workGroup);//绑定NIO线程组

bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new TimeChannelHandler());

}

});//设置Handler

//bootstrap.bind("8081");

ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接

future.sync();//同步阻塞等待连接建立成功

ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开

f.sync();//同步阻塞等待连接关闭完成

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

workGroup.shutdownGracefully();//清理资源

}

}

public static void main(String[] args) {

new TimeClient().connect(8080, "localhost");

}

private class TimeChannelHandler extends ChannelHandlerAdapter{

private int count=0;

//该方法会在建立连接后执行,表示触发了连接建立事件

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

for(int i=1;i<=100;i++){

String message = "query server time"+i+System.getProperty("line.separator");

ByteBuf req = Unpooled.copiedBuffer(message.getBytes());

ctx.writeAndFlush(req);//向服务端发送数据

}

}

//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String message = (String) msg;

System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

/**

* Netty实现的时间服务器服务端粘包拆包问题修正版

*/

public class TimeServer {

public static void main(String[] args) {

try {

new TimeServer().bind(8080);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//绑定端口

public void bind(int port) throws InterruptedException {

//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接

EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作

try {

//Netty服务端启动类配置

ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可

serverBootstrap.group(bossGroup, workGroup);//配置线程池

//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接

serverBootstrap.channel(NioServerSocketChannel.class);

//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒

serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

//和上面哪行代码一样,配置TCP连接的backlog参数为1024

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel

serverBootstrap.childHandler(new ChildChannelHandler());

//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取

ChannelFuture future1 = serverBootstrap.bind(port);

//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果

future1.sync();

//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中

ChannelFuture future2 = future1.channel().closeFuture();

//阻塞当前线程,直到future2总可以获取到关闭连接操作结果

future2.sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

ch.pipeline().addLast(new StringDecoder());

//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器

ch.pipeline().addLast(new TimeServerHandler());

}

}

//针对于SocketChannel的处理器,处理SocketChannel中的IO操作

private class TimeServerHandler extends ChannelHandlerAdapter {

//读取数据,并对数据进行处理

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String body = (String) msg;

System.out.println("server received:" + body);//输出

//响应数据

String res = "server current time:" + System.currentTimeMillis()+System.getProperty("line.separator");

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.writeAndFlush(data);

}

//数据读取过程中发生异常情况,要进行的后续处理都在该方法中

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

    该类LineBasedFrameDecoder是已经默认换行符为消息结束标志,无法修改;Netty还提供了另外一个类,可以自由指定结束标志字符,这个类就是DelimiterBasedFrameDecoder,可以通过该类指定字符$为消息结束标志字符。代码如下

public class TimeClient {

public void connect(int port, String host) {

//创建客户端的NIO线程组

EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类

try {

bootstrap.group(workGroup);//绑定NIO线程组

bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());

ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new TimeChannelHandler());

}

});//设置Handler

//bootstrap.bind("8081");

ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接

future.sync();//同步阻塞等待连接建立成功

ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开

f.sync();//同步阻塞等待连接关闭完成

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

workGroup.shutdownGracefully();//清理资源

}

}

public static void main(String[] args) {

new TimeClient().connect(8080, "localhost");

}

private class TimeChannelHandler extends ChannelHandlerAdapter{

private int count=0;

//该方法会在建立连接后执行,表示触发了连接建立事件

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

for(int i=1;i<=100;i++){

String message = "query server time"+i+"$";

ByteBuf req = Unpooled.copiedBuffer(message.getBytes());

ctx.writeAndFlush(req);//向服务端发送数据

}

}

//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String message = (String) msg;

System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

public class TimeServer {

public static void main(String[] args) {

try {

new TimeServer().bind(8080);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//绑定端口

public void bind(int port) throws InterruptedException {

//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接

EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作

try {

//Netty服务端启动类配置

ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可

serverBootstrap.group(bossGroup, workGroup);//配置线程池

//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接

serverBootstrap.channel(NioServerSocketChannel.class);

//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒

serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

//和上面哪行代码一样,配置TCP连接的backlog参数为1024

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel

serverBootstrap.childHandler(new ChildChannelHandler());

//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取

ChannelFuture future1 = serverBootstrap.bind(port);

//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果

future1.sync();

//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中

ChannelFuture future2 = future1.channel().closeFuture();

//阻塞当前线程,直到future2总可以获取到关闭连接操作结果

future2.sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());

ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));

ch.pipeline().addLast(new StringDecoder());

//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器

ch.pipeline().addLast(new TimeServerHandler());

}

}

//针对于SocketChannel的处理器,处理SocketChannel中的IO操作

private class TimeServerHandler extends ChannelHandlerAdapter {

//读取数据,并对数据进行处理

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String body = (String) msg;

System.out.println("server received:" + body);//输出

//响应数据

String res = "server current time:" + System.currentTimeMillis()+"$";

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.writeAndFlush(data);

}

//数据读取过程中发生异常情况,要进行的后续处理都在该方法中

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

    2. 消息定长解决消息数据的半包读取问题

    实际上,如果你可以确定每次发送的最大消息长度的话,那么可以采用消息定长的方式来解决,当然该方式必须保证消息定长的长度不会很大,可以使用Netty提供的FixedLengthFrameDecoder类实现消息定长。代码如下

public class TimeClient {

public void connect(int port, String host) {

//创建客户端的NIO线程组

EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();//创建客户辅助启动类

try {

bootstrap.group(workGroup);//绑定NIO线程组

bootstrap.channel(NioSocketChannel.class);//初始创建的Channel类型

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));

ch.pipeline().addLast(new StringDecoder());

ch.pipeline().addLast(new TimeChannelHandler());

}

});//设置Handler

//bootstrap.bind("8081");

ChannelFuture future = bootstrap.connect(host, port);//与服务端建立连接

future.sync();//同步阻塞等待连接建立成功

ChannelFuture f = future.channel().closeFuture();//等待连接关闭断开

f.sync();//同步阻塞等待连接关闭完成

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

workGroup.shutdownGracefully();//清理资源

}

}

public static void main(String[] args) {

new TimeClient().connect(8080, "localhost");

}

private class TimeChannelHandler extends ChannelHandlerAdapter{

private int count=0;

//该方法会在建立连接后执行,表示触发了连接建立事件

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

for(int i=1;i<=100;i++){

String message = "query server time"+i+"$";

ByteBuf req = Unpooled.copiedBuffer(message.getBytes());

ctx.writeAndFlush(req);//向服务端发送数据

}

}

//读取服务端发送的数据,如果服务端发送了数据,Channel触发了可读事件,那么该方法就会执行,读取数据

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String message = (String) msg;

System.out.println(message+";count="+ ++count);//读取服务端发送过来的数据,

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

public class TimeServer {

public static void main(String[] args) {

try {

new TimeServer().bind(8080);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//绑定端口

public void bind(int port) throws InterruptedException {

//想想Netty的线程模型,我们需要首先创建Reactor模式的线程组,有两个线程组

EventLoopGroup bossGroup = new NioEventLoopGroup();//主线程组,用于接收连接请求,并建立连接

EventLoopGroup workGroup = new NioEventLoopGroup();//工作线程组,该线程组通过SocketChannel进行IO操作

try {

//Netty服务端启动类配置

ServerBootstrap serverBootstrap = new ServerBootstrap();//创建服务端启动类,该类是辅助服务端启动的类,主要目的是降低开发复杂度,我们只需要配置相关参数即可

serverBootstrap.group(bossGroup, workGroup);//配置线程池

//配置服务端的NioServerSocketChannel,既然是作为服务端,肯定是需要一个ServerSocketChannel来接收客户端连接请求并建立连接

serverBootstrap.channel(NioServerSocketChannel.class);

//配置NioServerSocketChannel的参数,或者说NioServerSocketChannel对应的TCP连接的参数,比如该参数为连接超时参数,为10000毫秒

serverBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);

//和上面哪行代码一样,配置TCP连接的backlog参数为1024

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//最后,绑定IO事件的处理类ChildChannelHandler,该类就是Reactor模型中的Handler,用于处理IO事件,不过这里的Handler是专门处理ACCEPT事件的,也就是建立连接创建SocketChannel

serverBootstrap.childHandler(new ChildChannelHandler());

//服务端辅助启动类配置完成后,就可以绑定监听端口了,绑定结果会通过ChannelFuture来获取

ChannelFuture future1 = serverBootstrap.bind(port);

//这行代码表示阻塞当前线程,直到future1中能够获取绑定操作结果

future1.sync();

//closeFuture表示关闭连接,该方法会返回一个ChannelFuture,关闭连接操作结果会存储在该对象中

ChannelFuture future2 = future1.channel().closeFuture();

//阻塞当前线程,直到future2总可以获取到关闭连接操作结果

future2.sync();

} finally {

//释放线程池资源

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//这个类主要用于监听ServerSocketChannel接收连接请求并建立连接事件,也就是相当于NIO中的ACCEPT事件

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));

ch.pipeline().addLast(new StringDecoder());

//建立连接后创建的的SocketChannel,会在该方法中进行处理,添加响应的handler处理器

ch.pipeline().addLast(new TimeServerHandler());

}

}

//针对于SocketChannel的处理器,处理SocketChannel中的IO操作

private class TimeServerHandler extends ChannelHandlerAdapter {

//读取数据,并对数据进行处理

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

String body = (String) msg;

System.out.println("server received:" + body);//输出

//响应数据

String res = "server current time:" + System.currentTimeMillis()+"$";

ByteBuf data = Unpooled.copiedBuffer(res.getBytes());

ctx.writeAndFlush(data);

}

//数据读取过程中发生异常情况,要进行的后续处理都在该方法中

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();

}

}

}

    3. 定义应用层协议,规定消息结构

    上面两种方式基本都适用于比较小的文本消息数据传输,但如果针对于复杂的情况,比如既能传输文本数据,也能传输各种文件,那么就比较复杂了,因为TCP协议属于传输层协议,其不会理解传输的二进制数据代表着什么,所以就必须要定义一个上层应用层的协议,来对TCP接收的数据进行解析。

    协议就不细说了,简单来说就是客户端服务端双方的约定,协议规定了客户端发送数据的格式,也规定了服务端解析数据的方式。常见的HTTP协议就是一个很好的例子,其采用了多种方式解决半包读取的问题,首先就是指定了消息的结构(比如请求头、请求行、请求体),并且对于其中的请求头、请求头和请求行肯定也需要一个结束符表示,消息子结构之间的间隔会添加一个换行符来表示结束。可以去详细了解一下Http协议的格式。

    Http协议相应的处理Handler肯定不用说,Netty中已经准备好了,只需直接调用就可以实现一个简单的Http协议的文件服务器。代码如下

public class HttpFileServer {

private static final String DEFAULT_URL = "/src/netty_demo/";

public static void main(String[] args) {

int port = 8080;

try {

new HttpFileServer().run(port, DEFAULT_URL);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

//服务器启动方法

public void run(final int port,final String url) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workGroup = new NioEventLoopGroup();

try {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.group(bossGroup, workGroup);

bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast("http-decoder",

new HttpRequestDecoder());//添加HTTP请求消息解码器

ch.pipeline().addLast("http-aggregator",

new HttpObjectAggregator(65536));//该解码器用于将多个Http消息合并为一个完整的HTTP消息,

ch.pipeline().addLast("http-encoder",

new HttpResponseEncoder());//HTTP响应数据编码器

ch.pipeline().addLast("http-chunked",

new ChunkedWriteHandler());//该handler用于解决异步传输大码流(如文件传输),但不会占用太多内存,防止内存溢出

ch.pipeline().addLast("fileServerHandler",

new HttpFileServerHandler(url));

}

});

ChannelFuture cf = bootstrap.bind(port).sync();

System.out.println("HTTP文件服务器启动,地址为:http://localhost:8080"+url);

cf.channel().closeFuture().sync();

} finally {

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

//返回HTTP响应,表示该请求错误

private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status){

FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,

status, Unpooled.copiedBuffer(status.toString()+"

", CharsetUtil.UTF_8));

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

}

//设置HTTP响应的响应头

private static void setContentTypeHeader(HttpResponse response, File file) {

MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();

response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));

}

//响应一个重定向消息

private static void sendRedirect(ChannelHandlerContext ctx, String uri) {

FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FOUND);

response.headers().set(HttpHeaderNames.LOCATION, uri);

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

}

//返回文件列表的html页面

private static void sendFileList(ChannelHandlerContext ctx, File dir) {

FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,

HttpResponseStatus.OK);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");

StringBuilder html = new StringBuilder();

String dirPath = dir.getPath();

html.append("<!DOCTYPE html>

" +

"<html>

" +

"<head>");

html.append("<title>"+dirPath+"目录:"+"</title>");

html.append("</head>");

html.append("<body>

");

html.append("<h3>").append(dirPath).append("目录:").append("</h3>

");

html.append("<ul>");

html.append("<li>链接:<a href="../">..</a></li>

");

for (File f:dir.listFiles()) {

if (f.isHidden() || !f.canRead()) {

continue;

}

String name = f.getName();

html.append("<li>链接:<a href=""+name+"">"+name+"</a></li>

");

}

html.append("</ul>");

html.append("</body>

");

html.append("</html>");

ByteBuf buf = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);

response.content().writeBytes(buf);

buf.release();

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

}

//HTTP请求入栈后进行的一系列操作,包括进行响应消息

private class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private String url;

public HttpFileServerHandler(String url) {

this.url = url;

}

//解码uri,并转为本地文件目录

private String decodeUri(String uri) {

try {

uri = URLDecoder.decode(uri, "UTF-8");

} catch (UnsupportedEncodingException e) {

try {

uri = URLDecoder.decode(uri, "ISO-8859-1");

} catch (UnsupportedEncodingException ex) {

throw new RuntimeException("请求路径异常");

}

}

if (!uri.startsWith(DEFAULT_URL)) {

return null;

}

if (!uri.startsWith("/")) {

return null;

}

uri = uri.replace("/", File.separatorChar);

String path = System.getProperty("user.dir")+uri;

return path;

}

//接收到请求消息后进行的处理

@Override

protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {

//首先判断请求路径正确性

if(request.decoderResult().isFailure()){

sendError(ctx, HttpResponseStatus.BAD_REQUEST);

return;

}

if(request.method()!=HttpMethod.GET) {

sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);

return;

}

String uri = request.uri();

String path = decodeUri(uri);

if (path == null) {

sendError(ctx, HttpResponseStatus.FORBIDDEN);

return;

}

File file = new File(path);

if (file.isHidden() || !file.exists()) {

sendError(ctx, HttpResponseStatus.NOT_FOUND);

return;

}

//如果请求路径对应的本地文件是一个文件夹(目录)那么就将该目录下的文件变成一个列表展示在HTML页面中,

//并将该页面作为响应消息返回给客户端

if (file.isDirectory()) {

if(uri.endsWith("/")) {

sendFileList(ctx, file);

} else {

sendRedirect(ctx, uri+"/");

}

return;

}

if (!file.isFile()) {

sendError(ctx, HttpResponseStatus.FORBIDDEN);

return;

}

//如果请求路径对应的目标是文件,那么就开启文件传输

RandomAccessFile accessFile = null;

try {

accessFile = new RandomAccessFile(file, "r");//以只读方式打开文件

}catch (FileNotFoundException e) {

sendError(ctx, HttpResponseStatus.NOT_FOUND);

return;

}

long fileLength = accessFile.length();

//创建并设置响应头和响应行

HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);

setContentTypeHeader(response, file);

response.headers().set(HttpHeaderNames.CONTENT_LENGTH, fileLength+"");

String connection = request.headers().get(HttpHeaderNames.CONNECTION).toString();

if(connection.contentEquals(HttpHeaderValues.KEEP_ALIVE)) {

response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

}

ctx.write(response);

//创建并设置响应体,即为文件数据

ChannelFuture sendFileFuture;

sendFileFuture = ctx.write(new ChunkedFile(accessFile, 0, fileLength, 8192), ctx.newProgressivePromise());

sendFileFuture.addListener(new ChannelProgressiveFutureListener() {

@Override

public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {

if (total < 0) {

System.out.println("文件传输中,已发送:"+progress+"byte");

}else {

System.out.println("文件传输中,已发送:"+progress+"/"+total+"byte");

}

}

@Override

public void operationComplete(ChannelProgressiveFuture future) throws Exception {

System.out.println("文件传输完成");

}

});

ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

if(connection.contentEquals(HttpHeaderValues.KEEP_ALIVE)) {

lastContentFuture.addListener(ChannelFutureListener.CLOSE);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

if (ctx.channel().isActive()) {

sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);

}

}

}

}

    如果看完了Http协议服务器代码,你会觉得似乎nginx中对于反向代理的实现代码似乎就是这样的,当然,其中的路径等参数并不是写死了,而是通过nginx的配置文件定义,并且定义一系列的负载均衡规则等,实际代码肯定要复杂的多,但是原理是一样的。

3. 总结

    在对Netty进行使用之后,相信10个人里10个人都不会愿意再去使用NIO的原生类库了,Netty相较于NIO原生类库的最大优势就在这一点上,开发复杂度低,而且类库齐全,功能强大。

以上是 Netty学习笔记(10)——Netty应用示例(1) 的全部内容, 来源链接: utcz.com/z/511126.html

回到顶部