Netty的部分案例实现

编程

  1. Netty简单入门案例实现

  • server端

package com.shi.netty.netty1.simple;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* @author shiye

* @create 2020-06-10 10:24

*/

public class NettyServer {

public static void main(String[] args) throws InterruptedException {

//创建bossGroup 和 workerGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup(8);

//创建服务器端启动对象,配置参数

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup) //设置俩个线程组

.channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 做为服务器通道的实现

.option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数

.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态

.childHandler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new NettyServerHandler());

}

});//给我们的workerGroup 的 Eventloop 对应的管道设置处理器

System.out.println("........server is ready..........");

//启动服务器(并绑定端口)

ChannelFuture channelFuture = bootstrap.bind(7777).sync();

//对关闭通道进行监听

channelFuture.channel().closeFuture().sync();

}

}

package com.shi.netty.netty1.simple;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

/**

* 自定义方法处理器

*

* @author shiye

* @create 2020-06-10 11:01

* 我们自定义一个handler需要集成 netty 规定好的 handlerAdapter

*/

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/**

* 读取数据的方法

*

* @param ctx 上下文对象,其中包含管道pipeline,通道channel,地址

* @param msg 客户端发送的数据

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// System.out.println("服务器启动的线程组为:" + Thread.currentThread().getName());

// System.out.println("server ctx = " + ctx);

// //将msg转成butbuf

// ByteBuf byteBuf = (ByteBuf) msg;

// System.out.println("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));

System.out.println("客户端地址:" + ctx.channel().remoteAddress());

//***********如果这边有一个任务执行的时间非常长,不能让客户端进行等待***********//

//解决方案1:把需要长时间执行的任务放到NIOEventLoop的taskQueue中(如果有多个任务时间是叠加的)

// ctx.channel().eventLoop().execute(() -> {

// try {

// TimeUnit.SECONDS.sleep(10);

// ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端终于执行完了客户端的任务了 ...", CharsetUtil.UTF_8));

//

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

// });

//解决方案2:用户自定义定时任务-》该任务提交到scheduletaskqueue中

ctx.channel().eventLoop().schedule(() -> {

try {

TimeUnit.SECONDS.sleep(10);

ctx.writeAndFlush(Unpooled.copiedBuffer("服务器端终于执行完了客户端的任务了 ...", CharsetUtil.UTF_8));

} catch (InterruptedException e) {

e.printStackTrace();

}

}, 5, TimeUnit.SECONDS);

}

/**

* 读取数据完成

*

* @param ctx

* @throws Exception

*/

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

//把数据写入缓存,并且刷新

ctx.writeAndFlush(Unpooled.copiedBuffer("hello client~ ...", CharsetUtil.UTF_8));

}

/**

* 捕捉异常信息

*

* @param ctx

* @param cause

* @throws Exception

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("NettyServerHandler 捕捉到了异常..." + cause);

ctx.close();

}

}

  • 客户端

package com.shi.netty.netty1.simple;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

/**

* 客户端

*

* @author shiye

* @create 2020-06-10 11:25

*/

public class NettyClient {

public static void main(String[] args) throws InterruptedException {

int count = 500;

// List<Integer> list = new ArrayList<Integer>();

// for (int i = 0; i < count; i++) {

// list.add(i);

// }

//

// list.parallelStream().forEach((temp) -> {

// new Thread(() -> {

// NettyClient nettyClient = new NettyClient();

// try {

// nettyClient.pushClint();

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

// }).start();

// });

NettyClient nettyClient = new NettyClient();

nettyClient.pushClint();

}

public void pushClint() throws InterruptedException {

//客户端需要一个事件监听组

EventLoopGroup eventGroup = new NioEventLoopGroup();

try {

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(eventGroup) //设置线程组

.channel(NioSocketChannel.class) //设置客户端通道的实现类

.handler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new NettyClientHandler());

}

});

//启动客户端去连接服务器端

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7777).sync();

//给关闭通道进行监听

channelFuture.channel().closeFuture().sync();

} finally {

//优雅的关闭服务器

eventGroup.shutdownGracefully();

}

}

}

package com.shi.netty.netty1.simple;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.util.CharsetUtil;

/**

* @author shiye

* @create 2020-06-10 11:41

*/

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

/**

* 当通道就绪就会激活改方法

*

* @param ctx

* @throws Exception

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("client:" + ctx);

ctx.writeAndFlush(Unpooled.copiedBuffer("server hello, I,m client....", CharsetUtil.UTF_8));

}

/**

* 当通道有读取事件时,会触发

*

* @param ctx

* @param msg

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf byteBuf = (ByteBuf) msg;

System.out.println("客户端接收到的消息:" + byteBuf.toString(CharsetUtil.UTF_8));

System.out.println("远程服务器端的地址:" + ctx.channel().remoteAddress());

}

}

 

2. 使用netty来接收http请求的简单案例

  • 服务器端:

package com.shi.netty.netty1.http;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* 测试netty接收http请求

*

* @author shiye

* @create 2020-06-15 11:06

*/

public class TestHttpServer {

public static void main(String[] args) {

//创建bossGroup 和 workerGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {

//创建服务器端启动对象,配置参数

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new TestServerInitializer());

ChannelFuture channelFuture = bootstrap.bind(6688).sync();

channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

}

package com.shi.netty.netty1.http;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.*;

import io.netty.util.CharsetUtil;

/**

* SimpleChannelInboundHandler 集成自 ChannelInboundHandlerAdapter

* HttpObject 是客户端和服务器端相互通讯的数据被封装成 HttpObject

*

* @author shiye

* @create 2020-06-15 11:08

*/

public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

@Override

protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {

System.out.println(" pipeline:" + ctx.pipeline().hashCode() + " Handler:" + this.hashCode());

System.out.println("msg 类型是:" + msg.getClass());

System.out.println("客户端地址:" + ctx.channel().remoteAddress());

if (msg instanceof HttpRequest) {

ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器端,收到了你的请求...", CharsetUtil.UTF_8);

//构造一个httpresponse对象并返回

HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");

response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

//返回

ctx.writeAndFlush(response);

}

}

}

package com.shi.netty.netty1.http;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.HttpServerCodec;

/**

* @author shiye

* @create 2020-06-15 11:08

*/

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//向管道中假如处理器

//得到管道

ChannelPipeline pipeline = ch.pipeline();

//假如一个netty提供得HttpServerCodec 是HTTP编码解码器

pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());

//添加一个自定义得handler

pipeline.addLast(new TestHttpServerHandler());

}

}

  • 浏览器测试: http://localhost:6688/

以上是 Netty的部分案例实现 的全部内容, 来源链接: utcz.com/z/517483.html

回到顶部