Java学习笔记17Netty实战及优化

编程

Java学习笔记17-Netty实战及优化

尝试自己实现

短连接:请求/响应之后,关闭已经建立的TCP连接,下次请求再建立一次连接。

长连接:请求/响应之后,不关闭TCP连接,多次请求,复用同一个连接。

为了避免频繁创建连接/释放连接带来的性能损耗,以及消息获取的实时性,采用长连接的形式。

粘包:Nagle算法-客户端累积一定量或者缓冲一段时间再传输。服务端缓冲区堆积。导致多个请求数据粘在一起。

拆包:发送的数据大于发送缓冲区,进行分片传输。服务端缓冲区堆积,导致服务端读取的请求数据不完整。

使用WebSocket

WebSocket协议是基于TCP的一种新的网络协议。

它的出现实现了浏览器与服务器全双工(full-duplex)通信:允许服务器主动发送信息给客户端。

多客户端多语言多服务器支持:浏览器、php、Java、ruby、nginx、python、Tomcat、erlang、.net等等

连接过程:

  1. 客户端 请求连接 -- GET /chat HTTP/1.1 -->> 服务端
  2. 客户端 <<-- HTTP/1.1 101 xxx -- 服务端 返回响应
  3. 客户端 open <<-- push -- 服务端
  4. 客户端 -- send -->> 服务端

WebSocket测试代码

WebSocketServer 服务端

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

public class WebSocketServer {

static int port = 20480;

public static void main(String[] args) throws Exception {

EventLoopGroup mainGroup = new NioEventLoopGroup(1);

EventLoopGroup subGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(mainGroup, subGroup).channel(NioServerSocketChannel.class)

.handler(new LoggingHandler(LogLevel.DEBUG))

.option(ChannelOption.SO_REUSEADDR, true)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new HttpServerCodec());

pipeline.addLast(new HttpObjectAggregator(65536));

pipeline.addLast(new WebSocketServerHandler());

}

})

.childOption(ChannelOption.SO_REUSEADDR, true);

ChannelFuture f = b.bind(port).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

System.out.println("端口绑定完成:" + future.channel().localAddress());

}

}).sync();

// 获取键盘输入

BufferedReader input = new BufferedReader(new InputStreamReader(System.in, "UTF-8"));

new Thread(() -> {

String msg;

// 接收从键盘发送过来的数据

try {

System.out.print("请输入信息:");

while ((msg = input.readLine()) != null) {

WebSocketSession.pushMsg(msg);

if (msg.length() == 0) {

break;

}

}

} catch (IOException e) {

e.printStackTrace();

}

}).start();

f.channel().closeFuture().sync();

} finally {

mainGroup.shutdownGracefully();

subGroup.shutdownGracefully();

}

}

}

WebSocketServerHandler 服务端Handler

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.*;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.util.AttributeKey;

import io.netty.util.CharsetUtil;

import java.util.List;

import java.util.Map;

import java.util.concurrent.atomic.LongAdder;

import static io.netty.handler.codec.http.HttpMethod.GET;

import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

private static final String WEBSOCKET_PATH = "/websocket";

private WebSocketServerHandshaker handshaker;

public static final LongAdder counter = new LongAdder();

@Override

public void channelRead0(ChannelHandlerContext ctx, Object msg) {

counter.add(1);

if (msg instanceof FullHttpRequest) {

System.out.println("Http请求");

handleHttpRequest(ctx, (FullHttpRequest) msg);

} else if (msg instanceof WebSocketFrame) {

System.out.println("WebSocket请求");

handleWebSocketFrame(ctx, (WebSocketFrame) msg);

}

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) {

ctx.flush();

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

cause.printStackTrace();

ctx.close();

}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

// 如果http解码失败,则返回http异常,并且判断消息头有没有包含Upgrade字段(协议升级)

if (!req.decoderResult().isSuccess() || req.method() != GET || (!"websocket".equals(req.headers().get("Upgrade")))) {

System.out.println("Http异常");

sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));

return;

}

// 构造握手响应返回

WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

getWebSocketLocation(req), null, true, 5 * 1024 * 1024);

handshaker = wsFactory.newHandshaker(req);

if (handshaker == null) {

// 版本不支持

WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

} else {

System.out.println("WebSocket握手响应");

handshaker.handshake(ctx.channel(), req);

// 解析请求,判断token

Map<String, List<String>> parameters = new QueryStringDecoder(req.uri()).parameters();

// 连接限制,需要验证token,拒绝陌生连接

String token = parameters.get("token").get(0);

// 保存token

ctx.channel().attr(AttributeKey.valueOf("token")).getAndSet(token);

// 保存会话

WebSocketSession.saveSession(token, ctx.channel());

// 结束

}

}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 关闭

if (frame instanceof CloseWebSocketFrame) {

Object token = ctx.channel().attr(AttributeKey.valueOf("token")).get();

WebSocketSession.removeSession(token.toString());

handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

return;

}

// ping/pong作为心跳

if (frame instanceof PingWebSocketFrame) {

System.out.println("ping: " + frame);

ctx.write(new PongWebSocketFrame(frame.content().retain()));

return;

}

// 文本

if (frame instanceof TextWebSocketFrame) {

String text = ((TextWebSocketFrame) frame).text();

String outStr = text

+ ", 欢迎使用Netty WebSocket服务, 现在时刻:"

+ new java.util.Date().toString();

System.out.println("收到:" + text);

System.out.println("发送:" + outStr);

//发送到客户端websocket

ctx.channel().write(new TextWebSocketFrame(outStr));

return;

}

// 不处理二进制消息

if (frame instanceof BinaryWebSocketFrame) {

// Echo the frame

ctx.write(frame.retain());

}

}

private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {

// Generate an error page if response getStatus code is not OK (200).

if (res.status().code() != 200) {

ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);

res.content().writeBytes(buf);

buf.release();

HttpUtil.setContentLength(res, res.content().readableBytes());

}

// Send the response and close the connection if necessary.

ChannelFuture f = ctx.channel().writeAndFlush(res);

if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {

f.addListener(ChannelFutureListener.CLOSE);

}

}

private static String getWebSocketLocation(FullHttpRequest req) {

String location = req.headers().get(HttpHeaderNames.HOST) + WEBSOCKET_PATH;

return "ws://" + location;

}

}

WebSocketSession 服务端会话连接池

import io.netty.channel.Channel;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Random;

import java.util.concurrent.ConcurrentHashMap;

public class WebSocketSession {

/**

* 会话连接池

*/

static ConcurrentHashMap<String, Channel> sessionMap = new ConcurrentHashMap<>();

/**

* 保存会话

*

* @param sessionID

* @param channel

*/

public static void saveSession(String sessionID, Channel channel) {

sessionMap.put(sessionID, channel);

}

/**

* 移除会话

*

* @param sessionID

*/

public static void removeSession(String sessionID) {

sessionMap.remove(sessionID);

}

/**

* 推送消息

* @param msg

*/

public static void pushMsg(String msg) {

try {

if (sessionMap.isEmpty()) {

return;

}

int size = sessionMap.size();

ConcurrentHashMap.KeySetView<String, Channel> keySetView = sessionMap.keySet();

String[] keys = keySetView.toArray(new String[]{});

System.out.println(WebSocketServerHandler.counter.sum() + " : 当前用户数量" + keys.length);

for (int i = 0; i < size; i++) {

// 提交任务给它执行

String key = keys[new Random().nextInt(size)];

Channel channel = sessionMap.get(key);

if (channel == null) {

continue;

}

if (!channel.isActive()) {

sessionMap.remove(key);

continue;

}

channel.eventLoop().execute(() -> {

System.out.println("推送:" + msg);

channel.writeAndFlush(new TextWebSocketFrame(msg));

});

}

} catch (Exception ex) {

ex.printStackTrace();

}

}

}

WebSocketClient 客户端

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.http.HttpClientCodec;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

public class WebSocketClient {

static String host = "127.0.0.1";

static int port = 20480;

public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class)

.handler(new LoggingHandler(LogLevel.DEBUG))

.option(ChannelOption.SO_REUSEADDR, true)

.handler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) {

ChannelPipeline p = ch.pipeline();

p.addLast(new HttpClientCodec());

p.addLast(new HttpObjectAggregator(8192));

p.addLast(WebSocketClientCompressionHandler.INSTANCE);

p.addLast(new WebSocketClientHandler());

}

});

ChannelFuture f = b.connect(host, port).sync();

f.channel().closeFuture().sync();

} finally {

group.shutdownGracefully();

}

}

}

WebSocketClientHandler 客户端Handler

import io.netty.channel.*;

import io.netty.handler.codec.http.DefaultHttpHeaders;

import io.netty.handler.codec.http.FullHttpResponse;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

import java.net.URI;

import java.util.concurrent.atomic.AtomicInteger;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private static final String WEBSOCKET_PATH = "/websocket";

private WebSocketClientHandshaker handshaker;

private ChannelPromise handshakeFuture;

static AtomicInteger counter = new AtomicInteger(0);

public ChannelFuture handshakeFuture() {

return handshakeFuture;

}

@Override

public void handlerAdded(ChannelHandlerContext ctx) {

handshakeFuture = ctx.newPromise();

}

@Override

public void channelActive(ChannelHandlerContext ctx) {

if (handshaker == null) {

InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();

URI uri = null;

try {

uri = new URI("ws://" + address.getHostString() + ":" + address.getPort()

+ WEBSOCKET_PATH + "?token=" + counter.incrementAndGet());

} catch (Exception e) {

e.printStackTrace();

}

handshaker = WebSocketClientHandshakerFactory.newHandshaker(

uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());

}

handshaker.handshake(ctx.channel());

}

@Override

public void channelInactive(ChannelHandlerContext ctx) {

System.out.println("客户端已断开");

}

@Override

public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

Channel ch = ctx.channel();

if (!handshaker.isHandshakeComplete()) {

try {

handshaker.finishHandshake(ch, (FullHttpResponse) msg);

System.out.println("客户端已连接");

handshakeFuture.setSuccess();

} catch (WebSocketHandshakeException e) {

System.out.println("客户端连接失败");

handshakeFuture.setFailure(e);

}

return;

}

if (msg instanceof FullHttpResponse) {

FullHttpResponse response = (FullHttpResponse) msg;

throw new IllegalStateException(

"Unexpected FullHttpResponse (getStatus=" + response.status() +

", content=" + response.content().toString(CharsetUtil.UTF_8) + ")");

}

WebSocketFrame frame = (WebSocketFrame) msg;

if (frame instanceof TextWebSocketFrame) {

TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;

System.out.println("已收到:" + textFrame.text());

} else if (frame instanceof PongWebSocketFrame) {

System.out.println("收到心跳");

} else if (frame instanceof CloseWebSocketFrame) {

System.out.println("收到关闭");

ch.close();

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

cause.printStackTrace();

if (!handshakeFuture.isDone()) {

handshakeFuture.setFailure(cause);

}

ctx.close();

}

}

Linux服务器优化

网络小知识:区分不同连接的方式,TCP连接四元组

服务器IP+服务器PORT+客户端IP+客户端PORT

服务器最大连接数:理论上是无限的,但在Linux中会受到最大文件连接数、内存等的影响,在TCP、UDP协议的开头,会分别有16位来存储源端口号和目标端口号,2^16-1=65535个,所以传输层协议限制了最多只有65535个端口,但实际可以通过重复使用端口来提高连接数。

# 进程级最大文件连接数

vi /etc/security/limits.conf

* soft nofile 1000000

* hard nofile 1000000

vi /etc/sysctl.conf

fs.file-max=1000000 #系统级最大文件连接数

net.ipv4.ip_local_port_range=1024 65535 #新连接本地端口范围

net.ipv4.tcp_tw_reuse=1 #开启重新使用

net.ipv4.tcp_tw_recycle=1 #开启加速回收

sysctl -p

Netty优化

1. Handler对象复用

@ChannelHandler.Sharable 标识为可共享

每个连接一个Channel,独占一个Pipeline

Pipeline共享handler对象,不再new handler()

注意:防止handler中有共享变量,导致线程安全问题。

2. 耗时操作引入业务线程池

处理过程中,耗时业务逻辑会占用I/O线程导致阻塞,应单独交给指定线程池处理

可在添加handler的时候,指定EventLoopGroup业务专属线程池

pipeline.addLast(businessGroup, businessHandller);

3. 响应内容,必定经过Netty I/O

查看源码可知,业务handler中write最终会通过Netty I/O发送

单次write数据量过大,会导致I/O线程被一个连接长时间占用,导致用户体验变差

可调整操作系统TCP缓存区大小

业务handler中将单次数据量过大的write,分批多次进行小数据量的write,加快运转速度,提高用户体验。

4. ByteBuf复用机制

NIOEventLoop收到Selector通知,然后进入read环节,申请一个ByteBuf对象,作为数据的载体,最后转交handler进行业务处理,ByteBuf采用引用计数,Release释放之后才能被回收,ByteBuf对象可通过ctx.write(msg)来进行ReferenceCountUtil.release()释放,也可通过((ByteBuf) msg).release()释放,来进行ByteBuf回收复用。

优化2-提高请求/推送的吞吐量

  1. 业务操作提交到单独的线程执行。
  2. 调整TCP缓冲区大小,提高网路吞吐量。
  3. 基于Netty框架开发时,业务代码逻辑的调优。
  4. 结合Netty框架特点,复用对象,实现性能提升。

小结

  1. 并发连接数主要靠操作系统参数调优;
  2. 吞吐量的提升,主要靠代码处理能力来提升;
  3. 有时候网络和磁盘会成为瓶颈;
  4. 水平扩展,集群的方式是最终方案;
  5. Netty框架的运作机制很重要;

以上是 Java学习笔记17Netty实战及优化 的全部内容, 来源链接: utcz.com/z/511365.html

回到顶部