【Java】基于Netty自定义RPC

基于Netty自定义RPC

RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主

1.是基于HTTP的restful形式的广义远程调用,以spring could的feign和restTemplate为代表,采用的协议是HTTP的7层 调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。

2.是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输, 序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。

接下来我们主要以第二种的RPC远程调用来自己实现

需求:

​ 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty

步骤

  1. 创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

1.公共模块

包目录结构如下:

【Java】基于Netty自定义RPC

首先,在公共模块中添加netty的maven依赖

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>4.1.16.Final</version>

</dependency>

提供者(服务端)及消费者(客户端)工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值

接口的定义

/**

* @author 振帅

* @create 2021/1/13 2:10

* @description: IUserService

* 一个普通的接口,参数是支持序列化的String类型,返回值同理

*/

public interface IUserService {

public String sayHello(String msg);

}

2.提供者的实现(服务端)

包目录结构如下:

ServerBoot :启动类,启动服务

UserServiceHandler:自定义的业务处理器

UserServiceImpl:公共模块接口的实现

pom.xml文件需要引入公共模块

<dependencies>

<dependency>

<groupId>com.lagou</groupId>

<artifactId>rpc_common</artifactId>

<version>1.0-SNAPSHOT</version>

</dependency>

</dependencies>

2.1接口的实现

/**

* @author 振帅

* @create 2021/1/13 2:12

* @description: UserServiceImpl

*/

public class UserServiceImpl implements IUserService {

//将来客户端要远程调用的方法

public String sayHello(String msg) {

System.out.println("==>" + msg);

return "服务器返回数据 :" + msg;

}

//创建一个方法启动服务器

public static void startServer(String ip, int port) throws InterruptedException {

//1.创建两个线程池对象

NioEventLoopGroup bossGroup = new NioEventLoopGroup();

NioEventLoopGroup workGroup = new NioEventLoopGroup();

//2.创建服务端的启动引导对象

ServerBootstrap serverBootstrap = new ServerBootstrap();

//3.配置启动引导对象

serverBootstrap.group(bossGroup,workGroup)

//设置通道为NIO

.channel(NioServerSocketChannel.class)

//创建监听channel

.childHandler(new ChannelInitializer<NioSocketChannel>() {

protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {

//获取管道对象

ChannelPipeline pipeline = nioSocketChannel.pipeline();

//给管道对象pipLine 设置编码

pipeline.addLast(new StringEncoder());

pipeline.addLast(new StringDecoder());

//把我们自定义ChannelHandler添加到通道中

pipeline.addLast(new UserServiceHandler());

}

});

//4.绑定端口

serverBootstrap.bind(port).sync();

}

}

netty服务端启动步骤

  1. 创建两个线程池对象(NioEventLoopGroup)

    1)bossGroup:负责接收用户连接,监听客户端请求

    2)workGroup:负责处理用户的io读写操作,处理与客户端的数据通讯

  2. 创建启动引导类
  3. 设置启动引导类

    1)两个线程池添加到组中

    2)设置一个通道类型 NIO

    3)绑定一个初始化监听

  4. 绑定端口
  5. 关闭通道

NioEventLoopGroup

 **一个Netty服务端启动时,通常会有两个NioEventLoopGroup:**一个是监听线程组,主要是监听客户端请求,另一个是工作线程组,主要是处理与客户端的数据通讯。

​ Netty客户端只有一个NioEventLoopGroup,就是用来处理与服务端通信的线程组。

​ NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

Channel

【Java】基于Netty自定义RPC

  1. Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
  2. ChannelHandler,核心处理业务就在这里,用于处理业务请求。
  3. ChannelHandlerContext,用于传输业务数据。
  4. ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。

ServerBootstrap

服务端的启动引导对象

客户端的启动引导对象为Bootstrap

ChannelHandler

【Java】基于Netty自定义RPC

ChannelHandler:核心处理业务

ChannelHandler下主要是两个子接口

ChannelInboundHandler(入栈): 处理输入数据和Channel状态类型改变。

​ 适配器: ChannelInboundHandlerAdapter(适配器设计模式)

​ 常用的: SimpleChannelInboundHandler

ChannelOutboundHandler(出栈): 处理输出数据

​ 适配器: ChannelOutboundHandlerAdapter

每一个Handler都一定会处理出栈或者入栈(可能两者都处理数据),例如对于入栈的Handler可能会继承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,

而SimpleChannelInboundHandler又是继承于ChannelInboundHandlerAdapter,最大的区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理,

并且入栈的消息可以通过泛型来规定。

这里为什么有设配器模式呢?

我们在写自定义Handel时候,很少会直接实现上面两个接口,因为接口中有很多默认方法需要实现,所以这里就采用了设配器模式,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter就是设配器模式的产物,让它去实现上面接口,实现它所有方法。那么你自己写自定义Handel时,只要继承它,就无须重写上面接口的所有方法了。

ChannelInitializer

实现了ChannelHandler创建serverbootstrap的时候会经常用到,它用于对刚刚接收的channel进行初始化

ChannelPipeline

管道对象

ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。它提供了一种高级的截取过滤模式(类似serverlet中的filter功能),让用户可以在ChannelPipeline中完全控制一个事件以及如何处理ChannelHandler与ChannelPipeline的交互。

​ 对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将器pipeline附加到channel中。

//在ChannelPipeline的第一个位置添加ChannelHandler

addFirst(...)

//在ChannelPipeline的末尾添加ChannelHandler

addLast(...)

网络只能传输字节数据,

netty发送或接收消息后,必须将消息数据从一种形式转化为另一种。

接收消息后,需要将消息从字节码转成java对象(由某种解码器解码);

发送消息前,需要将java对象转成字节(由某种类型的编码器进行编码)。

//给管道对象pipLine 设置编码解码

pipeline.addLast(new StringEncoder());

pipeline.addLast(new StringDecoder());

//把我们自定义ChannelHandler添加到通道中

pipeline.addLast(new UserServiceHandler());

2.2自定义ChannelHandler

继承ChannelInboundHandlerAdapter 重写channelRead方法,当客户端读取数据时,该方法会被调用

/**

* @author 振帅

* @create 2021/1/13 2:27

* @description: UserServiceHandler 自定义的业务处理器

*/

public class UserServiceHandler extends ChannelInboundHandlerAdapter {

/**

* 当客户端读取数据时,该方法会被调用

* @param ctx

* @param msg

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//注意 客户端将来发送请求的时候会传递一个参数:UserService#sayHello#are you ok

//1.判断当前的请求是否符合规则

if (msg.toString().startsWith("UserService")) {

//2.如果符合规则,调用实现类获取一个result

UserServiceImpl userService = new UserServiceImpl();

String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));

//3.把调用实现类的方法获取的结果写到客户端

ctx.writeAndFlush(result);

}

}

}

ChannelHandlerContext

上下文对象 存储handler信息 写操作

我们先约定客户端传递参数的格式为:UserService#sayHello# + msg;

所以这里需要 msg.toString().substring(msg.toString().lastIndexOf("#") + 1)来获取msg的信息。

2.3服务端的启动

/**

*启动类

*/

public class ServerBoot {

public static void main(String[] args) throws InterruptedException {

//启动服务器

UserServiceImpl.startServer("127.0.0.1",8998);

}

}

3.消费者的实现(客户端)

包目录结构如下:

ConsumerBoot :启动类,启动服务

RPCConsumer:消费者

UserClientHandler:自定义Handler

3.1RPCConsumer的实现

主要有以下几个步骤

  1. 创建一个线程池对象 -- 它要处理我们自定义事件
  2. 声明一个自定义事件处理器 UserClientHandler
  3. 编写方法,初始化客户端(创建连接池 bootStrap 设置bootStrap 连接服务器)
  4. 编写一个方法,使用jdk动态代理创建对象

/**

* 消费者

*/

public class RPCConsumer {

/**

* 1.创建一个线程池对象 -- 它要处理我们自定义事件

*/

private static ExecutorService executorService =

//线程池线程数以当前CPU核数为准

Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

/**

* 2.声明一个自定义事件处理器 UserClientHandler

*/

private static UserClientHandler userClientHandler;

/**

* 3.编写方法,初始化客户端(创建连接池 bootStrap 设置bootStrap 连接服务器)

*/

public static void initClient() throws InterruptedException {

// 1).初始化UserClientHandler

userClientHandler = new UserClientHandler();

// 2).创建连接池对象

NioEventLoopGroup group = new NioEventLoopGroup();

// 3).创建客户端的引导对象

Bootstrap bootstrap = new Bootstrap();

// 4).配置引导对象

bootstrap.group(group)

//设置通道为NIO

.channel(NioSocketChannel.class)

//设置请求协议为TCP

.option(ChannelOption.TCP_NODELAY,true)

//监听channel 并初始化

.handler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel socketChannel) throws Exception {

//获取管道

ChannelPipeline pipeline = socketChannel.pipeline();

//设置编码

pipeline.addLast(new StringEncoder());

pipeline.addLast(new StringDecoder());

//添加自定义事件处理器

pipeline.addLast(userClientHandler);

}

});

// 5).连接服务器

bootstrap.connect("127.0.0.1",8998).sync();

}

/**

* 4.编写一个方法,使用jdk动态代理创建对象

*/

public static Object createProxy(Class<?> serviceClass, final String providerParam) {

return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),

new Class[]{serviceClass},

new InvocationHandler() {

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

//1.初始化客户端client

if (userClientHandler ==null) {

initClient();

}

//2.给UserClientHandler 设置param参数

userClientHandler.setParam(providerParam + args[0]);

//3.使用线程池,开启一个线程处理call() 写操作 并返回结果

Object result = executorService.submit(userClientHandler).get();

//4.return 结果

return result;

}

});

}

}

Executors

主要用于提供线程池相关的操作,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。

//创建固定数目线程的线程池。

public static ExecutorService newFiexedThreadPool(int Threads)

Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ExecutorService

Runtime.getRuntime().availableProcessors()

线程池线程数以当前CPU核数为准

Runtime.getRuntime().availableProcessors()方法询问jvm,jvm去问操作系统,操作系统去问硬件 。

3.2初始化客户端

和服务端的类似,不过客户端只需要创建一个连接池对象

客户端的引导对象是BootStrap,而服务端是ServerBootstrap

option主要是针对boss线程组,child主要是针对worker线程组

option / handler / attr 方法

option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;对于客户端的SocketChannel,主要是用来处理 业务操作;

attr: 设置通道的属性;

option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

  对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

  因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

3.3UserClientHandler自定义事件处理器

public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

//1.定义成员变量

private ChannelHandlerContext context;//事件处理器上下文对象(存储handler信息 写操作)

private String result;//记录服务器返回的数据

private String param;//记录将要返回给服务器的数据

//2.实现channelActive 客户端和服务器连接时,该方法就自动执行

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

//初始化ChannelHandlerContext

this.context = ctx;

}

//3.实现channelRead 当我们读到服务器数据时,该方法自动执行

//synchronized 同步

@Override

public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//将读到的服务器的数据msg设置为成员变量的值

this.result = msg.toString();

//唤醒写操作

notify();

}

//4.将客户端的数据写到服务器

public synchronized Object call() throws Exception {

//context给服务器写数据

context.writeAndFlush(param);

wait();

return result;

}

//5.设置参数的方法

public void setParam(String param){

this.param = param;

}

}

3.4客户端的启动

public class ConsumerBoot {

//定义参数

private static final String PROVIDER_NAME = "UserService#sayHello#";

public static void main(String[] args) throws InterruptedException {

//1.创建代理对象

IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);

//2循环给服务器写数据

while (true) {

String result = service.sayHello("are you ok !!");

System.out.println(result);

Thread.sleep(2000);

}

}

}

以上是 【Java】基于Netty自定义RPC 的全部内容, 来源链接: utcz.com/a/98841.html

回到顶部