Java 实现简单的 RPC 框架

java

RPC 简介

RPC,全称为 Remote Procedure Call,即远程过程调用,它是一个计算机通信协议。它允许像调用本地服务一样调用远程服务。它可以有不同的实现方式,而不需要了解底层网络技术的协议。 RPC 协议假定某些传输协议的存在,如 TCP 或 UDP,为通信程序之间携带信息数据。如 RMI(远程方法调用)、Hessian、Http invoker 等。

怎样实现一个 RPC 框架

RPC 能够让本地应用简单、高效地调用服务器中的过程。它主要应用在分布式系统。如 Hadoop 中的 IPC 组件。但怎样实现一个 RPC 框架呢?
可以从下面几个方面思考:

  • 通信模型:假设通信的为 A 机器与 B 机器,A 与 B 之间有通信模型,在 Java 中一般基于 BIO 或 NIO。
  • 过程(服务)定位:使用给定的通信方式,与确定 IP 与端口及方法名称确定具体的过程或方法;
  • 远程代理对象:本地调用的方法(服务)其实是远程方法的本地代理,因此可能需要一个远程代理对象,对于 Java 而言,远程代理对象可以使用 Java 的动态对象实现,封装了调用远程方法调用;
  • 序列化,将对象名称、方法名称、参数等对象信息进行网络传输需要转换成二进制传输,这里可能需要不同的序列化技术方案。如:protobuf,Arvo 等。

RPC 框架架构

RPC 架构分为三部分:

  • 服务提供者,运行在服务器端,提供服务接口定义与服务实现类。
  • 服务中心,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。
  • 服务消费者,运行在客户端,通过远程代理对象调用远程服务。

RPC 框架的简单实现

这里我只介绍服务提供者和客户端的实现方式。

服务提供者

服务提供者 IHello 接口定义:

public interface IHello {

String sayHello(String string);

}

服务提供者 IHello 接口实现:

public class HelloImpl implements IHello {

@Override

public String sayHello(String string) {

return "Hello:" + string;

}

}

服务端 RpcProxyServer 类:

public class RpcProxyServer {

ExecutorService executorService = Executors.newCachedThreadPool();

public void publisher(Object service, int port) {

ServerSocket serverSocket = null;

try {

// 启动 socket 服务

serverSocket = new ServerSocket(port);

while (true) {

Socket socket = serverSocket.accept();

executorService.execute(new ProcessorHandler(service, socket));

}

} catch (IOException e) {

e.printStackTrace();

}

}

}  

服务端 RpcRequest 类:

public class RpcRequest implements Serializable {

private static final long serialVersionUID = 383378368319625542L;

private String className;

private String methodName;

private Object[] params;

public String getClassName() {

return className;

}

public void setClassName(String className) {

this.className = className;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Object[] getParams() {

return params;

}

public void setParams(Object[] params) {

this.params = params;

}

@Override

public String toString() {

return "RpcRequest{" +

"className='" + className + '\'' +

", methodName='" + methodName + '\'' +

", params=" + Arrays.toString(params) +

'}';

}

}

服务端 ProcessorHandler 类:

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutputStream;

import java.lang.reflect.InvocationTargetException;

import java.lang.reflect.Method;

import java.net.Socket;

public class ProcessorHandler implements Runnable {

Socket socket;

Object service;

public ProcessorHandler(Object service, Socket socket) {

this.socket = socket;

this.service = service;

}

@Override

public void run() {

System.out.println("begin processor handler!");

ObjectInputStream objectInputStream = null;

try {

objectInputStream = new ObjectInputStream(socket.getInputStream());

RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();

Object restlt = invoke(rpcRequest);

ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

objectOutputStream.writeObject(restlt);

objectOutputStream.flush();

objectInputStream.close();

} catch (Exception e) {

e.printStackTrace();

} finally {

if (objectInputStream != null) {

try {

objectInputStream.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

private Object invoke(RpcRequest request) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {

Object[] args = request.getParams();

Class<?>[] types = new Class[args.length];

for (int i = 0; i < args.length; i++) {

types[i] = args[i].getClass();

}

Method method = service.getClass().getMethod(request.getMethodName(), types);

return method.invoke(service, args);

}

}

服务端主类 RpcServerMain:

public class RpcServerMain {

public static void main(String[] args) {

IHello hello = new HelloImpl();

RpcProxyServer rpcProxyServer = new RpcProxyServer();

rpcProxyServer.publisher(hello, 8080);

System.out.println(hello.sayHello("charles"));

}

}

客户端

客户端 IHello 类:

public interface IHello {

String sayHello(String string);

}

客户端 RpcClientProxy 类:

public class RpcClientProxy {

public <T> T clientProxy(Class<T> interfaceCls, String host, int port) {

return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));

}

}

客户端 RpcRequest 类:

public class RpcRequest implements Serializable {

private static final long serialVersionUID = 383378368319625542L;

private String className;

private String methodName;

private Object[] params;

public String getClassName() {

return className;

}

public void setClassName(String className) {

this.className = className;

}

public String getMethodName() {

return methodName;

}

public void setMethodName(String methodName) {

this.methodName = methodName;

}

public Object[] getParams() {

return params;

}

public void setParams(Object[] params) {

this.params = params;

}

@Override

public String toString() {

return "RpcRequest{" +

"className='" + className + '\'' +

", methodName='" + methodName + '\'' +

", params=" + Arrays.toString(params) +

'}';

}

}

客户端 RpcNetTransport 类:

import java.io.IOException;

import java.io.ObjectInputStream;

import java.io.ObjectOutput;

import java.io.ObjectOutputStream;

import java.net.Socket;

public class RpcNetTransport {

String host;

int port;

public RpcNetTransport(String host, int port) {

this.host = host;

this.port = port;

}

private Socket createSocket() {

System.out.println("Begin create socket connect!");

Socket socket = null;

try {

socket = new Socket(host, port);

} catch (Exception e) {

throw new RuntimeException("build connect failed.");

}

return socket;

}

public Object send(RpcRequest rpcRequest) {

Socket socket = null;

try {

socket = createSocket();

ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

objectOutputStream.writeObject(rpcRequest);

objectOutputStream.flush();

// 返回结果接收

ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());

Object resultObject = objectInputStream.readObject();// 反序列化 对象

objectInputStream.close();

objectOutputStream.close();

return resultObject;

} catch (Exception e) {

throw new RuntimeException("send request exception:" + e);

} finally {

if (socket != null) {

try {

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

客户端 RemoteInvocationHandler 类:

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

public class RemoteInvocationHandler implements InvocationHandler {

String host;

int port;

public RemoteInvocationHandler(String host, int port) {

this.host = host;

this.port = port;

}

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

RpcRequest rpcRequest = new RpcRequest();

rpcRequest.setClassName(method.getDeclaringClass().getName());

rpcRequest.setMethodName(method.getName());

rpcRequest.setParams(args);

RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);

return rpcNetTransport.send(rpcRequest);

}

}

客户端主类 RpcClientMain:

public class RpcClientMain {

public static void main(String[] args) {

RpcClientProxy rpcClientProxy = new RpcClientProxy();

IHello hello = rpcClientProxy.clientProxy(IHello.class, "localhost", 8080);

System.out.println(hello.sayHello("charles"));

}

}

项目启动后客户端向服务端发送了一条消息,分别运行两个项目后输出结果如下

服务端:

begin processor handler!

客户端:

Begin create socket connect!

Hello:charles

总结

RPC 本质为消息处理模型,RPC 屏蔽了底层不同主机间的通信细节,让进程调用远程的服务就像是本地的服务一样。

以上是 Java 实现简单的 RPC 框架 的全部内容, 来源链接: utcz.com/z/390732.html

回到顶部