【Java】Netty学习笔记(一)初遇篇

Netty学习笔记(一)初遇篇

北冥有只鱼发布于 41 分钟前

开始之前

这里我们在复习一下简单的复习NIO的三个重要的核心知识点,Selector(选择器)、Channel(通道)、Buffer(缓冲区)。这三个概念是从I/O多路复用抽象而来,网络通信的开始是建立连接(对于TCP协议来说),在建立连接后,双方开始互相发送数据。那么如果是在很多客户端要和服务端进行通信,那么就会有很多连接,但是就算是很多客户端发起请求想要和服务端建立连接,那么发起的请求也是有先后顺序的,更为准确的说,和服务端建立连接也是有先后顺序的,多路复用的思路在Java的实现就是选择器管理通道,连接建立完成,也就是认为可以通信了,但是数据可能还没准备好,选择器可以在数据到达完成的时候才处理数据。

我们可以认为Channel是对连接的抽象,TCP是面向连接的,那么连接建立之后,我们可以认为就通信的双方就建立了一条信道,就可以用来传输数据了,数据我们存放在缓冲区中。选择器负责管理通道,当通道中有选择器感兴趣的事件(可以读或者可以写)之后,选择器就可以选中这个通道,由程序做对应的数据处理,或者读或者写。

缘起

Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol that is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application

如今我们用不同的应用或者库来进行交流,例如,我们经常使用HTTP Client的库从WEB服务器检索信息,通过RPC去调用WEB服务。然而,通用协议的扩展性是不高的,就像我们不会使用通用的HTTP协议去实现交换绝大的文件,Email信息,近乎实时的消息,比如财务信息和多人游戏的数据。我们需要的是一个高性能的协议实现,专门用于一个特殊的目的。比如,你可能想要实现一个HTTP服务器用于Ajax的聊天应用程序、媒体流、大型文件传输。甚至想自己设计实现一个协议。即使你实现了新协议,你也得保证与旧系统的兼容性。在这种情况下,重要的是我们能够以多快的速度实现该协议,同时又不会对最终的应用程序的稳定性和性能产生影响。

总结

Netty为低延时、自定义协议打造,通用的协议扩展性不高,实时性不高。有的时候我们想单独定制一个网络协议,又想和系统中使用的旧有协议保持兼容。那么就需要用到NIO了,但是JDK原生的NIO存在问题,写起来繁琐。我们希望简单一些,这就是Netty的缘起。

简介

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

Netty是一个用来快速开发网络应用程序(网络协议的客户端和服务端)的NIO框架,Netty能够让开发网络应用程序(比如TCP和UDP的服务端)更为简单和高效
快速和简单并不意味这Netty会有性能和可靠性上的问题。Netty是经过精心设计的,在实现许多的网络协议,比如FTP、SMTP、HTTP、各种二进制和基于文本的协议,有着良好的实践。原因在于,Netty已经成功地找到了一种方法来实现开发的易用性、性能、稳定性和灵活性。

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

Netty是一个异步事件驱动的网络通信框架,用于快速开发可维护的高性能协议服务器和客户端。
到这里我们就可以大致给Netty定性,Netty首先是一个NIO框架,性能好,稳定性强,可以被用来做应用服务器(比如Tomcat)。可以用来实现网络协议。

在 NIO 学习笔记(二)相识篇中我们实现了一个聊天室,写起来十分复杂,在JDK的某些版本(有人说JDK在1.8已经解决了这个问题,但是有人说还没解决,有兴趣的可以去搜一搜JDK NIO 空轮询),还存在一些BUG。我们都喜欢简单的东西,于是Netty应运而生,简化了NIO的编程,解决了JDK原生NIO编程的BUG。

在哪些地方用到了

  • Dubbo

    • 引入的依赖用到了Netty(RPC(远程过程调用,也可以理解为通信))

  • RocketMQ - 经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

基本上在Java语言内,需要用到网络通信的,都有Netty的影子。

特点

  • 稳定

  • 设计

    • Unified API for various transport types - blocking and non-blocking socket

    • Based on a flexible and extensible event model which allows clear separation of concerns

    • Highly customizable thread model - single thread, one or more thread pools such as SEDA

    • True connectionless datagram socket support (since 3.1)

  • 易用性(Ease of use)

    • Well-documented Javadoc, user guide and examples

  • No additional dependencies, JDK 5 (Netty 3.x) or 6 (Netty 4.x) is enough.

辅助测试工具 curl

curl 是常用的命令行工具, 用来请求 Web 服务器。它的名字就是客户端(client)的 URL 工具的意思。
在Netty系列的学习笔记中,我们用Netty来实现一个简单的服务器,我们使用curl和浏览器来测试这个服务器。
如何安装参考Windows下安装使用curl命令

当然是从Hello World开始了啊

简单实例

我们从一个例子来介绍Netty的使用,首先我们还是一个maven工程,然后引入对应的依赖:
【Java】Netty学习笔记(一)初遇篇

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>4.1.56.Final</version>

</dependency>

public class NettyDemo01 {

public static void main(String[] args) throws InterruptedException {

// 创建事件循环组 接收连接

EventLoopGroup bossGroup = new NioEventLoopGroup();

// 接收连接 并分发给worker线程

EventLoopGroup workerGroup = new NioEventLoopGroup();

// 启动Netty

ServerBootstrap serverBootstrap = new ServerBootstrap();

ChannelFuture channelFuture

= serverBootstrap.group(bossGroup, workerGroup).

channel(NioServerSocketChannel.class).

childHandler(new NettyServerInitHandler()).bind(8080).sync();

// Wait until the server socket is closed.

// 等待直到服务端的socket关闭,在这个例子中,服务端的Socket永远不会关闭,但是你可以优雅的关闭

// 你的服务

// In this example, this does not happen, but you can do that to gracefully

// shut down your server.

channelFuture.channel().closeFuture().sync();

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

// 综合初始化器

public class NettyServerInitHandler extends ChannelInitializer<SocketChannel> {

// 通道被初次注册执行此方法等价于连接被建立

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// 将处理器加入

pipeline.addLast("HttpServerCodec",new HttpServerCodec());

// 将自定义的处理加入

pipeline.addLast("NettyServerHandler",new NettyServerHandler());

}

}

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

//如果消息的类型是Http请求,才处理

if (msg instanceof HttpRequest){

// 统一编码

ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

// 发送HTTP响应

DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

// 设置请求头

response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");

//

response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());

// 返回响应

ctx.writeAndFlush(response);

}

}

}

在浏览器输入localhost:8080 的测试结果:
【Java】Netty学习笔记(一)初遇篇
在curl输入localhost:8080的测试结果:
【Java】Netty学习笔记(一)初遇篇
浏览器的测试结果字很小,我调大了才这么显示。

基本分析

还记得Java的网络编程吗? 我们要写Java的NIO做网络编程,通常是下面这样:

   Selector selector = Selector.open();

//创建服务端的通道

ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();

// 将该通道设置为非阻塞方式

serverSocketChannel1.configureBlocking(false);

// 创建Socket

ServerSocket serverSocket = serverSocketChannel1.socket();

// 绑定端口

serverSocket.bind(new InetSocketAddress(8888));

// 为ServerSocketChannel注册对对客户端请求连接事件感兴趣

// 此时该channel处于选择器得管理之下

serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

基本上核心的思想是先生成选择器对象(Selector),然后将通道设置为非阻塞模式,然后监听端口,然后将通道交给我们产生的选择器管理。然后获取选择器上就绪的事件,根据不同的就绪事件做不同的数据处理。
Netty将其简化了,提供了统一的API,EventLoopGroup用于接收连接和处理连接,我们需要两个EventLoopGroup,一个用于管理连接,另一个处理连接。在ServerBootstrap指定哪个管理连接,哪个处理连接。最后我们还是要处理数据,也就是在ServerBootstrap指定数据的处理者。
Netty处理的基本流程:
【Java】Netty学习笔记(一)初遇篇
相对于原生的NIO,逻辑又清晰了很多。

总结一下使用Netty编码的基本流程

  • 入口类/主程序类

    • 配置一些参数(端口,谁处理连接)

  • 内置初始化器
    -调用一些内置的类(编码、解码)
  • 自定义初始化器

    • 编写一些自定义的类(处理数据)

    Netty内部提供了非常强大的类库(内置初始化器),每个初始化器都可以完成一个小功能,以后在开发时,我们第一步需要先将需要完成的功能分解成若干部,第二部只需要在netty类库中寻找,看哪些已有类能够帮助我们直接实现;第三步,如果某个功能Netty没有提供,则编写自定义的初始化器。

基本API梳理

  • 继承链

【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
注意InBound和OutBound。

  • 常用的API

我们结合继承连来说常用的API,从SimpleChannelInboundHandler开始吧:
【Java】Netty学习笔记(一)初遇篇
ChannelHandlerAdapter有两个空方法,是从ChannelHandler(是一个接口)中而来。
【Java】Netty学习笔记(一)初遇篇
然后方法上没有说明,我们去看对应的接口上的方法说明:
【Java】Netty学习笔记(一)初遇篇

 /**

* Gets called after the {@link ChannelHandler} was added to the actual context

and it's ready to handle events.

在对应的ChannelHandler(处理器)被加入到实际的上下文,

该处理器准备去处理一些事件时 被调用。

*

*/

void handlerAdded(ChannelHandlerContext ctx) throws Exception;

同样的handlerRemoved是在对应的处理器被移除之后,被调用。

  • 连接建立 (通道被注册)
  • 有数据到来 (通道被激活)
  • 数据被读取完毕(通道失活)
  • 连接取消(通道取消注册)

我们重写这些方法来再次看下HTTP协议

再度审视HTTP协议

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

if (msg instanceof HttpRequest) {

HttpRequest httpRequest = (HttpRequest) msg;

URI uri = new URI(httpRequest.uri());

// 浏览器请求的时候会请求两次,会默认请求一次icon

// 这里先不处理这次请求。

if (!"/favicon.ico".equals(uri.getPath())) {

System.out.println(httpRequest.method().name());

ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");

response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

ctx.writeAndFlush(response);

}

}

}

// 增加处理时,自动触发

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

System.out.println("1.增加处理器");

super.handlerAdded(ctx);

}

// 当把通道注册到选择器触发

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("2.通道被注册");

super.channelRegistered(ctx);

}

// 通道被激活,触发另一个方法

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("3.通道被激活");

super.channelActive(ctx);

}

// 激活的通道,失去激活状态

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println("4.通道失活");

super.channelInactive(ctx);

}

// 通道取消注册

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("5.通道取消注册");

super.channelUnregistered(ctx);

}

// 失败捕获

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("6.异常捕获");

super.exceptionCaught(ctx, cause);

}

}

这里我认为你已经装上了curl了,我们启动服务端来测试一下。

  • curl的测试结果

【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
对应网络请求一次请求,一次响应。
我们现在用浏览器来测试一下:
【Java】Netty学习笔记(一)初遇篇
有可能你出现的是这种情况:
【Java】Netty学习笔记(一)初遇篇
我们首先来分析一下为什么会出现这种状况:
【Java】Netty学习笔记(一)初遇篇
首先是浏览器发起了两次请求,一次请求就是一个连接。
【Java】Netty学习笔记(一)初遇篇
所以是两次,那为什么两次结果都不一样呢?
这就跟建立连接的先后顺序有关系了,第一种结果就是两次连接建立起来的时间间隔比较短造成的。所以是112233.
而第二种就是在第一个连接建立完毕的时候,第二个请求才建立起连接。
然后我们清空控制台,再来请求一下看一下结果:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
这种情况是为什么呢? 因为我们对于浏览器发起的favicon.ico请求没有响应,这个请求还一直在等待中。
我们再次请求的时候,浏览器会将上一次的请求取消,也就是连接取消建立,再度请求一次。
所以会出现1、2、3、4、5这种情况。
那么为什么我们处理了的请求,为很么连接没有取消呢?
这就涉及到HTTP协议了,我们知道HTTP是无状态的,也就是说每次客户端和服务端建立连接都是要TCP建立连接,再传送数据,但有的时候这样开销有点大,我们并不希望每次HTTP请求都建立TCP连接,那能不能保持上一次的连接呢?
这就是keep-alive,保持上一次的连接,我们上面指定的HTTP的协议是默认开启的,所以对于处理了的请求,并不会断开连接。在关闭网页之后就会断掉连接。
【Java】Netty学习笔记(一)初遇篇

再度总结

我们现在已经对Netty已经有一个大致的认知,知道这是一个高性能的、高扩展性的NIO框架。也能通过一些Netty提供的接口来网络编程了。使用Netty进行网络编程的流程是类似的,像JDBC一样,先是在在ServerBootstrap 中设定监测连接的和处理连接的,然后在ServerBootstrap 设定综合处理器(也就是NettyServerInitHandler),在综合处理器中加入Netty提供的处理器和自定义的处理器。在自定义的处理器中处理请求和响应。如果你还想在连接建立的时候,做一些工作,Netty也能满足你的需求。刚开始可能比较晕哈,这个正常,可以先大致记住这个流程

事件驱动机制与Reactor模式简介

Netty将自身定义为异步事件驱动的网络编程框架,那什么是事件驱动,通俗的说,就是在Netty中某事件发生会触发某些方法,在上面的基本API梳理中,我们已经发现了,连接建立触发一个方法,取消连接触发一个方法。更为准确的说法是Netty在收到Socket数据流的时候,会在pipeline上的处理器上传播,某些事件被触发即为调用对应的方法。
那Reactor(反应器)模式呢? 这是Netty的高性能就在于Reactor(反应器模式)模式上,Reactor是这样一种模式,它要求主线程(I/O处理单元)只负责监听文件描述上是否有事件发生,有的话立刻将该事件通知工作线程。

用Netty编写两个之间的聊天

这个其实只是一个示例,我们用来加深对Netty的理解而已,本身打算放在GitHub上,想了想还是放在这里吧。使用Netty编写代码的套路是一致的:

  • 在入口类设置监测连接的和真正处理连接的
  • 绑定端口
  • 设置初始化处理器(NettyServerInitHandler ),在这个处理器中加入Netty自带的和自己编写的
  • 在自己编写的处理器中,做数据处理工作。

所以上面我们编写的NettyDemo01也可以接着复用,这里我们要再讲一下ChannelPipeline,我们上面已经讲了,Netty在收到Socket数据流的时候会在pipeline上的处理器上传播。然后我们上面调用的都是addlast。所以我们自定义的处理器总是最后被传播上。
在编写对应的代码之前,我们再来一点点网络编程的知识,网络协议中有一个报文的概念,限制了一次发送信息最多发送多少个,TCP协议发送接收数据时,如果数据过大,接收到的数据会是分包的, 这就是通信的双方发送的消息可能是经过了拆分和合并,尽管使用微信聊天的我们可能并没有感觉。
Netty为我们提供拆分和并的类我们只需要将其加入到处理器中就行。除了拆分和合并,我们还得统一编码,不用担心Netty也提供了。
所以上面的初始化处理器就被改造成了这样:

public class MyNettyServerInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// 加入拆分

pipeline.addLast("decoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,8,0,8));

// 加入合并

pipeline.addLast("prepender",new LengthFieldPrepender(8));

// 加入解码

pipeline.addLast("StringDecoder",new StringDecoder(CharsetUtil.UTF_8));

// 加入编码

pipeline.addLast("StringEncoder",new StringEncoder(CharsetUtil.UTF_8));

// 加入自定义的处理器

pipeline.addLast("MyNettyServerHandler",new MyNettyServerHandler());

}

}

LengthFieldBasedFrameDecoder这个类我们简单解释一下,一般我们称这个类为拆包器,我们用的是这个构造函数:

public LengthFieldBasedFrameDecoder(int maxFrameLength,

int lengthFieldOffset,

int lengthFieldLength,

int lengthAdjustment,

int initialBytesToStrip)

maxFrameLength: 最大的报文长度,然后发送过来的信息超过这个,就会报异常,因此通信的双方需要约定一次说多少,
lengthFieldOffset: 一段报文,从哪开始读
LengthFieldBasedFrameDecoder关于这个类我们要细讲,恐怕这篇文章得再加上8000字。我们姑且就这么理解。

public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

//接收消息

System.out.println("服务端 接收到了" + ctx.channel().remoteAddress() + ",消息是:" + msg);

//发送消息

System.out.println("请输入内容:");

String send = new Scanner(System.in).nextLine();

ctx.channel().writeAndFlush(send);

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

ctx.writeAndFlush("开始聊天吧...");

}

}

上面我们讲过channelActive这个方法,连接真正建立的时候触发这个方法,为什么这样做呢? 因为我们这个处理器继承的还是SimpleChannelInboundHandler,先读后写,注意这里带的In,就是处理读的,有In就有Out。
聊天的客户端要稍微改动一下:

public class MyNettyClient {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

// 客户端,所以不是ServerBootStrap

Bootstrap bootstrap = new Bootstrap();

ChannelFuture channelFuture = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).

handler(new MyNettyClientInit()).connect("127.0.0.1", 8080).sync();

channelFuture.channel().closeFuture().sync();

eventLoopGroup.shutdownGracefully();

}

}

MyNettyClientInit可以复用服务端的:

public class MyNettyClientInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8));

pipeline.addLast("pretender", new LengthFieldPrepender(8));

pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());

}

}

客户端处理器

public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println("客户端 接收到了消息: " + ctx.channel().remoteAddress() + ",消息是" + msg);

System.out.println("请输入内容:");

String send = new Scanner(System.in).nextLine();

ctx.channel().writeAndFlush(send);

}

}

注意上面的建立连接时,发送的信息只能由一段发出,如果两端同时有,则两端再次陷入等待对方发送信息的状态。

有什么用

心跳机制

微服务正在称为常态的今天,微服务有个核心的组件也被我们所熟知,就是注册中心,注册中心该怎么知道某服务是活着的呢?我们通过心跳来判断人是否活着,那么注册中心也是通过心跳,每隔一段时间和服务进行通信,活着发送或者接收。
在一段时间内没收到信息,那么注册中心就可以认为该服务因为某些原因下线了。Netty也为我们提供了这样的类库,也是Netty内置的一个处理器IdleStateHandler。

 public IdleStateHandler( int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds)

readerIdleTimeSeconds: 每隔几秒读一下
writerIdleTimeSeconds: 每隔几秒写一下
allIdleTimeSeconds: 读或写超时时间
注意是两次间隔,客户端发送过来一次请求,服务端即完成了一次读写。
示例:

public class MyNettyServerInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("idleStateEvent", new IdleStateHandler(2,3,6));

pipeline.addLast("myIdleStatteHandler",new MyIdleStateHandler());

}

}

public class MyIdleStateHandler extends SimpleChannelInboundHandler<Object> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

}

// 读超时 或 写超时触发该方法

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent){

String eventType = null;

IdleStateEvent event = (IdleStateEvent)evt;

switch (event.state()){

case READER_IDLE:

// 客户端发送的两次请求间隔超过两秒触发

eventType = "读超时";

break;

case WRITER_IDLE:

// 客户端发送的两次请求间隔超过三秒触发

eventType = "写超时";

break;

case ALL_IDLE:

// 六秒内读或写都没触发,认为读写超时

eventType = "读写超时";

break;

}

System.out.println(eventType);

ctx.channel().close();

}

}

}

下面我们用curl测试下:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
写超时不再测试

netty实现webSocket

WebSocket简介

HTTP协议一向是客户端发起请求,服务端回应,但有的情况下,我们希望服务端主动的向客户端推送。这就是WebSocket协议。在之前服务端要想向客户端主动推送信息,要么是轮询(隔断时间问一下),要么一直是监听(要么是一直不挂电话),有响应之后,再度重新建立HTTP连接。这种方式相当消耗资源。WebSocket就能够做到一次HTTP信息,多次数据发送。

示例

java端示例:

public class NettyServerInitHandler extends ChannelInitializer<SocketChannel> {

// 通道被注册执行此方法

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("HttpServerCodec",new HttpServerCodec());

// 我们用POST方式请求服务器的时候,对应的参数信息是保存在message body中的,如果只是单纯的用HttpServerCodec

// 是无法完全的解析Http POST请求的,因为HttpServerCodec只能获取uri中参数,所以需要加上

// HttpObjectAggregator.

pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(4096));

// 设定地址

pipeline.addLast("WebSocketServerProtocolHandler",new WebSocketServerProtocolHandler("/myWebSocket"));

pipeline.addLast("NettyServerHandler",new MyWebSocketServerHandler());

}

}

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

System.out.println("server 接收到的客户端消息: " + msg.text());

ctx.channel().writeAndFlush(new TextWebSocketFrame("hello client"));

}

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("连接建立....");

}

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("连接取消建立....");

}

}

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">

<title>Title</title>

<script type="text/javascript">

// websocket是ws开头

var webSocket = new WebSocket("ws://localhost:8080/myWebSocket");

function sendMessage(msg) {

if (webSocket.readyState == webSocket.OPEN)

webSocket.send(msg);

}

webSocket.onmessage = function (event) {

document.getElementById("tip").innerText = "接收的消息: " + event.data;

}

webSocket.onopen = function (event) {

document.getElementById("tip").innerText = "连接开启";

}

webSocket.onclose = function (event) {

document.getElementById("tip").innerText = "连接关闭";

}

</script>

</head>

<body>

<form>

<textarea name="message">

</textarea>

<input type="button" onclick="sendMessage(this.form.message.value)" value="发送">

</form>

<div id="tip"></div>

</body>

</html>

效果:

顺带讲一下Netty官方指南

这里讲下,Netty开发者指南写的还是挺不错的:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
但是对刚写了没多少行代码的初学者来说,看这个就有点晕了,不仅需要良好的英文阅读能力,还需要对网络协议有一定的了解,同时Java基础要扎实,对网络编程要有一定的了解。不然不光看我这篇教程,你也十分晕。
Developer guide 开发者指南

参考资料:

  • 通俗地讲,Netty 能做什么?
  • netty中future.channel().closeFuture().sync()作用
  • Java NIO浅析
  • 如何深刻理解Reactor和Proactor?
  • HTTP Keep-Alive模式
  • netty 的事件驱动
  • 服务器端编程心得(二)—— Reactor模式
  • WebSocket 是什么原理?为什么可以实现持久连接?

java网络编程nioNetty

阅读 26更新于 38 分钟前

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

北冥有只鱼

22 声望

9 粉丝

0 条评论

得票时间

avatar

北冥有只鱼

22 声望

9 粉丝

宣传栏

开始之前

这里我们在复习一下简单的复习NIO的三个重要的核心知识点,Selector(选择器)、Channel(通道)、Buffer(缓冲区)。这三个概念是从I/O多路复用抽象而来,网络通信的开始是建立连接(对于TCP协议来说),在建立连接后,双方开始互相发送数据。那么如果是在很多客户端要和服务端进行通信,那么就会有很多连接,但是就算是很多客户端发起请求想要和服务端建立连接,那么发起的请求也是有先后顺序的,更为准确的说,和服务端建立连接也是有先后顺序的,多路复用的思路在Java的实现就是选择器管理通道,连接建立完成,也就是认为可以通信了,但是数据可能还没准备好,选择器可以在数据到达完成的时候才处理数据。

我们可以认为Channel是对连接的抽象,TCP是面向连接的,那么连接建立之后,我们可以认为就通信的双方就建立了一条信道,就可以用来传输数据了,数据我们存放在缓冲区中。选择器负责管理通道,当通道中有选择器感兴趣的事件(可以读或者可以写)之后,选择器就可以选中这个通道,由程序做对应的数据处理,或者读或者写。

缘起

Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol that is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application

如今我们用不同的应用或者库来进行交流,例如,我们经常使用HTTP Client的库从WEB服务器检索信息,通过RPC去调用WEB服务。然而,通用协议的扩展性是不高的,就像我们不会使用通用的HTTP协议去实现交换绝大的文件,Email信息,近乎实时的消息,比如财务信息和多人游戏的数据。我们需要的是一个高性能的协议实现,专门用于一个特殊的目的。比如,你可能想要实现一个HTTP服务器用于Ajax的聊天应用程序、媒体流、大型文件传输。甚至想自己设计实现一个协议。即使你实现了新协议,你也得保证与旧系统的兼容性。在这种情况下,重要的是我们能够以多快的速度实现该协议,同时又不会对最终的应用程序的稳定性和性能产生影响。

总结

Netty为低延时、自定义协议打造,通用的协议扩展性不高,实时性不高。有的时候我们想单独定制一个网络协议,又想和系统中使用的旧有协议保持兼容。那么就需要用到NIO了,但是JDK原生的NIO存在问题,写起来繁琐。我们希望简单一些,这就是Netty的缘起。

简介

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

Netty是一个用来快速开发网络应用程序(网络协议的客户端和服务端)的NIO框架,Netty能够让开发网络应用程序(比如TCP和UDP的服务端)更为简单和高效
快速和简单并不意味这Netty会有性能和可靠性上的问题。Netty是经过精心设计的,在实现许多的网络协议,比如FTP、SMTP、HTTP、各种二进制和基于文本的协议,有着良好的实践。原因在于,Netty已经成功地找到了一种方法来实现开发的易用性、性能、稳定性和灵活性。

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

Netty是一个异步事件驱动的网络通信框架,用于快速开发可维护的高性能协议服务器和客户端。
到这里我们就可以大致给Netty定性,Netty首先是一个NIO框架,性能好,稳定性强,可以被用来做应用服务器(比如Tomcat)。可以用来实现网络协议。

在 NIO 学习笔记(二)相识篇中我们实现了一个聊天室,写起来十分复杂,在JDK的某些版本(有人说JDK在1.8已经解决了这个问题,但是有人说还没解决,有兴趣的可以去搜一搜JDK NIO 空轮询),还存在一些BUG。我们都喜欢简单的东西,于是Netty应运而生,简化了NIO的编程,解决了JDK原生NIO编程的BUG。

在哪些地方用到了

  • Dubbo

    • 引入的依赖用到了Netty(RPC(远程过程调用,也可以理解为通信))

  • RocketMQ - 经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

基本上在Java语言内,需要用到网络通信的,都有Netty的影子。

特点

  • 稳定

  • 设计

    • Unified API for various transport types - blocking and non-blocking socket

    • Based on a flexible and extensible event model which allows clear separation of concerns

    • Highly customizable thread model - single thread, one or more thread pools such as SEDA

    • True connectionless datagram socket support (since 3.1)

  • 易用性(Ease of use)

    • Well-documented Javadoc, user guide and examples

  • No additional dependencies, JDK 5 (Netty 3.x) or 6 (Netty 4.x) is enough.

辅助测试工具 curl

curl 是常用的命令行工具, 用来请求 Web 服务器。它的名字就是客户端(client)的 URL 工具的意思。
在Netty系列的学习笔记中,我们用Netty来实现一个简单的服务器,我们使用curl和浏览器来测试这个服务器。
如何安装参考Windows下安装使用curl命令

当然是从Hello World开始了啊

简单实例

我们从一个例子来介绍Netty的使用,首先我们还是一个maven工程,然后引入对应的依赖:
【Java】Netty学习笔记(一)初遇篇

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>4.1.56.Final</version>

</dependency>

public class NettyDemo01 {

public static void main(String[] args) throws InterruptedException {

// 创建事件循环组 接收连接

EventLoopGroup bossGroup = new NioEventLoopGroup();

// 接收连接 并分发给worker线程

EventLoopGroup workerGroup = new NioEventLoopGroup();

// 启动Netty

ServerBootstrap serverBootstrap = new ServerBootstrap();

ChannelFuture channelFuture

= serverBootstrap.group(bossGroup, workerGroup).

channel(NioServerSocketChannel.class).

childHandler(new NettyServerInitHandler()).bind(8080).sync();

// Wait until the server socket is closed.

// 等待直到服务端的socket关闭,在这个例子中,服务端的Socket永远不会关闭,但是你可以优雅的关闭

// 你的服务

// In this example, this does not happen, but you can do that to gracefully

// shut down your server.

channelFuture.channel().closeFuture().sync();

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

// 综合初始化器

public class NettyServerInitHandler extends ChannelInitializer<SocketChannel> {

// 通道被初次注册执行此方法等价于连接被建立

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// 将处理器加入

pipeline.addLast("HttpServerCodec",new HttpServerCodec());

// 将自定义的处理加入

pipeline.addLast("NettyServerHandler",new NettyServerHandler());

}

}

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

//如果消息的类型是Http请求,才处理

if (msg instanceof HttpRequest){

// 统一编码

ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

// 发送HTTP响应

DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

// 设置请求头

response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");

//

response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());

// 返回响应

ctx.writeAndFlush(response);

}

}

}

在浏览器输入localhost:8080 的测试结果:
【Java】Netty学习笔记(一)初遇篇
在curl输入localhost:8080的测试结果:
【Java】Netty学习笔记(一)初遇篇
浏览器的测试结果字很小,我调大了才这么显示。

基本分析

还记得Java的网络编程吗? 我们要写Java的NIO做网络编程,通常是下面这样:

   Selector selector = Selector.open();

//创建服务端的通道

ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();

// 将该通道设置为非阻塞方式

serverSocketChannel1.configureBlocking(false);

// 创建Socket

ServerSocket serverSocket = serverSocketChannel1.socket();

// 绑定端口

serverSocket.bind(new InetSocketAddress(8888));

// 为ServerSocketChannel注册对对客户端请求连接事件感兴趣

// 此时该channel处于选择器得管理之下

serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

基本上核心的思想是先生成选择器对象(Selector),然后将通道设置为非阻塞模式,然后监听端口,然后将通道交给我们产生的选择器管理。然后获取选择器上就绪的事件,根据不同的就绪事件做不同的数据处理。
Netty将其简化了,提供了统一的API,EventLoopGroup用于接收连接和处理连接,我们需要两个EventLoopGroup,一个用于管理连接,另一个处理连接。在ServerBootstrap指定哪个管理连接,哪个处理连接。最后我们还是要处理数据,也就是在ServerBootstrap指定数据的处理者。
Netty处理的基本流程:
【Java】Netty学习笔记(一)初遇篇
相对于原生的NIO,逻辑又清晰了很多。

总结一下使用Netty编码的基本流程

  • 入口类/主程序类

    • 配置一些参数(端口,谁处理连接)

  • 内置初始化器
    -调用一些内置的类(编码、解码)
  • 自定义初始化器

    • 编写一些自定义的类(处理数据)

    Netty内部提供了非常强大的类库(内置初始化器),每个初始化器都可以完成一个小功能,以后在开发时,我们第一步需要先将需要完成的功能分解成若干部,第二部只需要在netty类库中寻找,看哪些已有类能够帮助我们直接实现;第三步,如果某个功能Netty没有提供,则编写自定义的初始化器。

基本API梳理

  • 继承链

【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
注意InBound和OutBound。

  • 常用的API

我们结合继承连来说常用的API,从SimpleChannelInboundHandler开始吧:
【Java】Netty学习笔记(一)初遇篇
ChannelHandlerAdapter有两个空方法,是从ChannelHandler(是一个接口)中而来。
【Java】Netty学习笔记(一)初遇篇
然后方法上没有说明,我们去看对应的接口上的方法说明:
【Java】Netty学习笔记(一)初遇篇

 /**

* Gets called after the {@link ChannelHandler} was added to the actual context

and it's ready to handle events.

在对应的ChannelHandler(处理器)被加入到实际的上下文,

该处理器准备去处理一些事件时 被调用。

*

*/

void handlerAdded(ChannelHandlerContext ctx) throws Exception;

同样的handlerRemoved是在对应的处理器被移除之后,被调用。

  • 连接建立 (通道被注册)
  • 有数据到来 (通道被激活)
  • 数据被读取完毕(通道失活)
  • 连接取消(通道取消注册)

我们重写这些方法来再次看下HTTP协议

再度审视HTTP协议

public class NettyServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

if (msg instanceof HttpRequest) {

HttpRequest httpRequest = (HttpRequest) msg;

URI uri = new URI(httpRequest.uri());

// 浏览器请求的时候会请求两次,会默认请求一次icon

// 这里先不处理这次请求。

if (!"/favicon.ico".equals(uri.getPath())) {

System.out.println(httpRequest.method().name());

ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);

DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");

response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

ctx.writeAndFlush(response);

}

}

}

// 增加处理时,自动触发

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

System.out.println("1.增加处理器");

super.handlerAdded(ctx);

}

// 当把通道注册到选择器触发

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("2.通道被注册");

super.channelRegistered(ctx);

}

// 通道被激活,触发另一个方法

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("3.通道被激活");

super.channelActive(ctx);

}

// 激活的通道,失去激活状态

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println("4.通道失活");

super.channelInactive(ctx);

}

// 通道取消注册

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("5.通道取消注册");

super.channelUnregistered(ctx);

}

// 失败捕获

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("6.异常捕获");

super.exceptionCaught(ctx, cause);

}

}

这里我认为你已经装上了curl了,我们启动服务端来测试一下。

  • curl的测试结果

【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
对应网络请求一次请求,一次响应。
我们现在用浏览器来测试一下:
【Java】Netty学习笔记(一)初遇篇
有可能你出现的是这种情况:
【Java】Netty学习笔记(一)初遇篇
我们首先来分析一下为什么会出现这种状况:
【Java】Netty学习笔记(一)初遇篇
首先是浏览器发起了两次请求,一次请求就是一个连接。
【Java】Netty学习笔记(一)初遇篇
所以是两次,那为什么两次结果都不一样呢?
这就跟建立连接的先后顺序有关系了,第一种结果就是两次连接建立起来的时间间隔比较短造成的。所以是112233.
而第二种就是在第一个连接建立完毕的时候,第二个请求才建立起连接。
然后我们清空控制台,再来请求一下看一下结果:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
这种情况是为什么呢? 因为我们对于浏览器发起的favicon.ico请求没有响应,这个请求还一直在等待中。
我们再次请求的时候,浏览器会将上一次的请求取消,也就是连接取消建立,再度请求一次。
所以会出现1、2、3、4、5这种情况。
那么为什么我们处理了的请求,为很么连接没有取消呢?
这就涉及到HTTP协议了,我们知道HTTP是无状态的,也就是说每次客户端和服务端建立连接都是要TCP建立连接,再传送数据,但有的时候这样开销有点大,我们并不希望每次HTTP请求都建立TCP连接,那能不能保持上一次的连接呢?
这就是keep-alive,保持上一次的连接,我们上面指定的HTTP的协议是默认开启的,所以对于处理了的请求,并不会断开连接。在关闭网页之后就会断掉连接。
【Java】Netty学习笔记(一)初遇篇

再度总结

我们现在已经对Netty已经有一个大致的认知,知道这是一个高性能的、高扩展性的NIO框架。也能通过一些Netty提供的接口来网络编程了。使用Netty进行网络编程的流程是类似的,像JDBC一样,先是在在ServerBootstrap 中设定监测连接的和处理连接的,然后在ServerBootstrap 设定综合处理器(也就是NettyServerInitHandler),在综合处理器中加入Netty提供的处理器和自定义的处理器。在自定义的处理器中处理请求和响应。如果你还想在连接建立的时候,做一些工作,Netty也能满足你的需求。刚开始可能比较晕哈,这个正常,可以先大致记住这个流程

事件驱动机制与Reactor模式简介

Netty将自身定义为异步事件驱动的网络编程框架,那什么是事件驱动,通俗的说,就是在Netty中某事件发生会触发某些方法,在上面的基本API梳理中,我们已经发现了,连接建立触发一个方法,取消连接触发一个方法。更为准确的说法是Netty在收到Socket数据流的时候,会在pipeline上的处理器上传播,某些事件被触发即为调用对应的方法。
那Reactor(反应器)模式呢? 这是Netty的高性能就在于Reactor(反应器模式)模式上,Reactor是这样一种模式,它要求主线程(I/O处理单元)只负责监听文件描述上是否有事件发生,有的话立刻将该事件通知工作线程。

用Netty编写两个之间的聊天

这个其实只是一个示例,我们用来加深对Netty的理解而已,本身打算放在GitHub上,想了想还是放在这里吧。使用Netty编写代码的套路是一致的:

  • 在入口类设置监测连接的和真正处理连接的
  • 绑定端口
  • 设置初始化处理器(NettyServerInitHandler ),在这个处理器中加入Netty自带的和自己编写的
  • 在自己编写的处理器中,做数据处理工作。

所以上面我们编写的NettyDemo01也可以接着复用,这里我们要再讲一下ChannelPipeline,我们上面已经讲了,Netty在收到Socket数据流的时候会在pipeline上的处理器上传播。然后我们上面调用的都是addlast。所以我们自定义的处理器总是最后被传播上。
在编写对应的代码之前,我们再来一点点网络编程的知识,网络协议中有一个报文的概念,限制了一次发送信息最多发送多少个,TCP协议发送接收数据时,如果数据过大,接收到的数据会是分包的, 这就是通信的双方发送的消息可能是经过了拆分和合并,尽管使用微信聊天的我们可能并没有感觉。
Netty为我们提供拆分和并的类我们只需要将其加入到处理器中就行。除了拆分和合并,我们还得统一编码,不用担心Netty也提供了。
所以上面的初始化处理器就被改造成了这样:

public class MyNettyServerInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

// 加入拆分

pipeline.addLast("decoder",new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,8,0,8));

// 加入合并

pipeline.addLast("prepender",new LengthFieldPrepender(8));

// 加入解码

pipeline.addLast("StringDecoder",new StringDecoder(CharsetUtil.UTF_8));

// 加入编码

pipeline.addLast("StringEncoder",new StringEncoder(CharsetUtil.UTF_8));

// 加入自定义的处理器

pipeline.addLast("MyNettyServerHandler",new MyNettyServerHandler());

}

}

LengthFieldBasedFrameDecoder这个类我们简单解释一下,一般我们称这个类为拆包器,我们用的是这个构造函数:

public LengthFieldBasedFrameDecoder(int maxFrameLength,

int lengthFieldOffset,

int lengthFieldLength,

int lengthAdjustment,

int initialBytesToStrip)

maxFrameLength: 最大的报文长度,然后发送过来的信息超过这个,就会报异常,因此通信的双方需要约定一次说多少,
lengthFieldOffset: 一段报文,从哪开始读
LengthFieldBasedFrameDecoder关于这个类我们要细讲,恐怕这篇文章得再加上8000字。我们姑且就这么理解。

public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

//接收消息

System.out.println("服务端 接收到了" + ctx.channel().remoteAddress() + ",消息是:" + msg);

//发送消息

System.out.println("请输入内容:");

String send = new Scanner(System.in).nextLine();

ctx.channel().writeAndFlush(send);

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

ctx.writeAndFlush("开始聊天吧...");

}

}

上面我们讲过channelActive这个方法,连接真正建立的时候触发这个方法,为什么这样做呢? 因为我们这个处理器继承的还是SimpleChannelInboundHandler,先读后写,注意这里带的In,就是处理读的,有In就有Out。
聊天的客户端要稍微改动一下:

public class MyNettyClient {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

// 客户端,所以不是ServerBootStrap

Bootstrap bootstrap = new Bootstrap();

ChannelFuture channelFuture = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).

handler(new MyNettyClientInit()).connect("127.0.0.1", 8080).sync();

channelFuture.channel().closeFuture().sync();

eventLoopGroup.shutdownGracefully();

}

}

MyNettyClientInit可以复用服务端的:

public class MyNettyClientInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 8));

pipeline.addLast("pretender", new LengthFieldPrepender(8));

pipeline.addLast("StringDecoder", new StringDecoder(CharsetUtil.UTF_8));

pipeline.addLast("StringEncoder", new StringEncoder(CharsetUtil.UTF_8));

pipeline.addLast("MyNettyClientHandler", new MyNettyClientHandler());

}

}

客户端处理器

public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println("客户端 接收到了消息: " + ctx.channel().remoteAddress() + ",消息是" + msg);

System.out.println("请输入内容:");

String send = new Scanner(System.in).nextLine();

ctx.channel().writeAndFlush(send);

}

}

注意上面的建立连接时,发送的信息只能由一段发出,如果两端同时有,则两端再次陷入等待对方发送信息的状态。

有什么用

心跳机制

微服务正在称为常态的今天,微服务有个核心的组件也被我们所熟知,就是注册中心,注册中心该怎么知道某服务是活着的呢?我们通过心跳来判断人是否活着,那么注册中心也是通过心跳,每隔一段时间和服务进行通信,活着发送或者接收。
在一段时间内没收到信息,那么注册中心就可以认为该服务因为某些原因下线了。Netty也为我们提供了这样的类库,也是Netty内置的一个处理器IdleStateHandler。

 public IdleStateHandler( int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds)

readerIdleTimeSeconds: 每隔几秒读一下
writerIdleTimeSeconds: 每隔几秒写一下
allIdleTimeSeconds: 读或写超时时间
注意是两次间隔,客户端发送过来一次请求,服务端即完成了一次读写。
示例:

public class MyNettyServerInit extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("idleStateEvent", new IdleStateHandler(2,3,6));

pipeline.addLast("myIdleStatteHandler",new MyIdleStateHandler());

}

}

public class MyIdleStateHandler extends SimpleChannelInboundHandler<Object> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

}

// 读超时 或 写超时触发该方法

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

if (evt instanceof IdleStateEvent){

String eventType = null;

IdleStateEvent event = (IdleStateEvent)evt;

switch (event.state()){

case READER_IDLE:

// 客户端发送的两次请求间隔超过两秒触发

eventType = "读超时";

break;

case WRITER_IDLE:

// 客户端发送的两次请求间隔超过三秒触发

eventType = "写超时";

break;

case ALL_IDLE:

// 六秒内读或写都没触发,认为读写超时

eventType = "读写超时";

break;

}

System.out.println(eventType);

ctx.channel().close();

}

}

}

下面我们用curl测试下:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
写超时不再测试

netty实现webSocket

WebSocket简介

HTTP协议一向是客户端发起请求,服务端回应,但有的情况下,我们希望服务端主动的向客户端推送。这就是WebSocket协议。在之前服务端要想向客户端主动推送信息,要么是轮询(隔断时间问一下),要么一直是监听(要么是一直不挂电话),有响应之后,再度重新建立HTTP连接。这种方式相当消耗资源。WebSocket就能够做到一次HTTP信息,多次数据发送。

示例

java端示例:

public class NettyServerInitHandler extends ChannelInitializer<SocketChannel> {

// 通道被注册执行此方法

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast("HttpServerCodec",new HttpServerCodec());

// 我们用POST方式请求服务器的时候,对应的参数信息是保存在message body中的,如果只是单纯的用HttpServerCodec

// 是无法完全的解析Http POST请求的,因为HttpServerCodec只能获取uri中参数,所以需要加上

// HttpObjectAggregator.

pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(4096));

// 设定地址

pipeline.addLast("WebSocketServerProtocolHandler",new WebSocketServerProtocolHandler("/myWebSocket"));

pipeline.addLast("NettyServerHandler",new MyWebSocketServerHandler());

}

}

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

System.out.println("server 接收到的客户端消息: " + msg.text());

ctx.channel().writeAndFlush(new TextWebSocketFrame("hello client"));

}

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("连接建立....");

}

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

System.out.println("连接取消建立....");

}

}

<!DOCTYPE html>

<html lang="en">

<head>

<meta charset="UTF-8">

<title>Title</title>

<script type="text/javascript">

// websocket是ws开头

var webSocket = new WebSocket("ws://localhost:8080/myWebSocket");

function sendMessage(msg) {

if (webSocket.readyState == webSocket.OPEN)

webSocket.send(msg);

}

webSocket.onmessage = function (event) {

document.getElementById("tip").innerText = "接收的消息: " + event.data;

}

webSocket.onopen = function (event) {

document.getElementById("tip").innerText = "连接开启";

}

webSocket.onclose = function (event) {

document.getElementById("tip").innerText = "连接关闭";

}

</script>

</head>

<body>

<form>

<textarea name="message">

</textarea>

<input type="button" onclick="sendMessage(this.form.message.value)" value="发送">

</form>

<div id="tip"></div>

</body>

</html>

效果:

顺带讲一下Netty官方指南

这里讲下,Netty开发者指南写的还是挺不错的:
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
【Java】Netty学习笔记(一)初遇篇
但是对刚写了没多少行代码的初学者来说,看这个就有点晕了,不仅需要良好的英文阅读能力,还需要对网络协议有一定的了解,同时Java基础要扎实,对网络编程要有一定的了解。不然不光看我这篇教程,你也十分晕。
Developer guide 开发者指南

参考资料:

  • 通俗地讲,Netty 能做什么?
  • netty中future.channel().closeFuture().sync()作用
  • Java NIO浅析
  • 如何深刻理解Reactor和Proactor?
  • HTTP Keep-Alive模式
  • netty 的事件驱动
  • 服务器端编程心得(二)—— Reactor模式
  • WebSocket 是什么原理?为什么可以实现持久连接?

以上是 【Java】Netty学习笔记(一)初遇篇 的全部内容, 来源链接: utcz.com/a/114419.html

回到顶部