Dubbo 服务端接收请求过程分析
开篇
接上个部分(Dubbo 客户端调用链路过程分析)讲到了客户端发送请求的过程,这个部分我们分析服务端接收请求并发送响应的过程。
在分析服务暴露的过程中,provider启动netty服务端的时候(NettyServer.doOpen),会在在ChannelPipeline链中加入了4个ChannelHandler。
- NettyCodecAdapter.InternalEncoder:编码器- NettyCodecAdapter.InternalDecoder:解码器
- IdleStateHandler:心跳处理器
- NettyServerHandler:请求处理器
- 在服务端接收客户端响应时,首先会经过解码器NettyCodecAdapter.InternalDecoder,然后经过NettyServerHandler.channelRead进行请求处理。
- 服务端接收请求处理之后,接着响应结果客户端,经过NettyServerHandler.write进行结果响应,然后经过NettyCodecAdapter.InternalEncoder编码器进行编码处理,最终响应给客户端。
接下来们将其拆分为处理请求和响应结果进行分析。
处理客户端请求
解码过程
解码过程不做具体分析
NettyCodecAdapter.InternalDecoder.decode-->DubboCountCodec.decode
-->ExchangeCodec.decode
-->DubboCodec.decodeBody
经过解码得到请求对象Request。
读取请求过程
1. NettyServerHandler.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
2. AbstractPeer.received
装饰角色,维护了closed状态
public void received(Channel ch, Object msg) throws RemotingException {// 如果通道已经关闭,则直接返回
if (closed) {
return;
}
handler.received(ch, msg);
}
3. MultiMessageHandler.received
对多消息的处理。
@Overridepublic void received(Channel channel, Object message) throws RemotingException {
//如果消息是MultiMessage类型的,做下类型转换
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
遍历发送
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
4. HeartbeatHandler.received
对心跳事件做了处理。
如果不是心跳请求,那么接下去走到AllChannelHandler的received。否则直接回复响应,不再继续往下走。
public void received(Channel channel, Object message) throws RemotingException {setReadTimestamp(channel);
//是否属于心跳的请求
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
}
return;
}
//是否属于心跳的响应
if (isHeartbeatResponse(message)) {
return;
}
handler.received(channel, message);
}
5. AllChannelHandler.received
这里io事件的派发策略和客户端接收响应结果逻辑一样,这里不再赘述。
将接收到的消息分发到线程池,线程池名称为:DubboServerHandler-10.204.246.187:20880-thread-。
提交给线程的任务是:ChannelEventRunnable
public void received(Channel channel, Object message) throws RemotingException {//获取处理线程
ExecutorService executor = getExecutorService();
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//提交线程失败,可以确定是线程池满了,需要将该提示响应给客户端
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
接下来的逻辑则是利用线程池去针对请求做了异步处理
6. ChannelEventRunnable.run
处理不同状态的请求,仅仅只是一个中转,针对不同状态交给指定的方法处理。
RECEIVED、CONNECTED、DISCONNECTED、SENT、CAUGHT
这里我们关注RECEIVED的处理链路。
public class ChannelEventRunnable implements Runnable {@Override
public void run() {
if (state == ChannelState.RECEIVED) {
handler.received(channel, message);
} else {
switch (state) {
case CONNECTED:
handler.connected(channel);
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
break;
case SENT:
handler.sent(channel, message);
break;
case CAUGHT:
handler.caught(channel, exception);
break;
default:
}
}
}
}
7. DecodeHandler.received
这里的解码主要是针对message对象是Decodeable对象的处理。前面以及解码为Request对象了,因此这里不会执行任何逻辑。
解码之后继续执行下一个handler的receive方法。
public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
8. HeaderExchangeHandler.received
处理Request请求,并异步返回Response,这里也是provider返回响应的入口
该方法分3个过程来看:
1、分请求类型进行处理:根据messgage的类型做不同的处理,例如正常的Request请求、Resopnse、telnet命令请求。
2、处理Request:继续向后执行(交给DubboProtocol的reply),得到异步调用结果。
3、返回Response: 将Request的调用id封装到Response,然后利用异步回调将结果封装到 Response 对象中,同时利用channel.send(res)方法将该结果发送给客户端(关于发送响应结果的过程会在下个部分进行分析);
@Overridepublic void received(Channel channel, Object message) throws RemotingException {
//设置时间戳
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
//获取通道
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
// 处理事件
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
// 双向通信
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
//如果是单向通信,仅向后调用指定服务即可,无需返回调用结果 handler.received(exchangeChannel, request.getData());
}
}
} elseif (message instanceof Response) {
handleResponse(channel, (Response) message);
} elseif (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//处理请求void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
//封装resopnse(把requests的id封装到resopnse了)
Response res = new Response(req.getId(), req.getVersion());
//如果请求被破坏了, 响应异常
if (req.isBroken()) {
...
...
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// 正常,取得请求数据,也就是 RpcInvocation 对象
Object msg = req.getData();
try {
//继续向下调用 返回一个future
CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
//设置调用结果状态为成功 res.setStatus(Response.OK);
res.setResult(appResult);
} else {
//如果服务调用有异常,则设置结果状态码为服务错误 res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 发送该响应
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
9. DubboProtocol.requestHandler.reply
1、获取invoker;
关于获取invoker主要过程是:
- 取得serivce对应的key
- 根据key从exporterMap中获取对应的Export对象
- 通过export获取invoker
2、进入invoker调用链;
3、返回执行结果CompletableFuture;
public class DubboProtocol extends AbstractProtocol {private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
//获取invoker
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.completionFuture().thenApply(Function.identity());
}
}
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {...
...
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
return exporter.getInvoker();
}
服务端invoker调用链过程
这里大概梳理下调用链路:
ProtocolFilterWrapper.CallbackRegistrationInvoker.invoke->EchoFilter.invoke
->ClassLoaderFilter.invoke
->GenericFilter.invoke
->ContextFilter.invoke
->TraceFilter.invoke
->TimeoutFilter.invoke
->MonitorFilter.invoke
->ExceptionFilter.invoke
->InvokerWrapper.invoke
->DelegateProviderMetaDataInvoker.invoke
->AbstractProxyInvoker.invoke
关于经过的Filter这里不做说明,这里重点看下AbstractProxyInvoker.invoke方法:
public Result invoke(Invocation invocation) throws RpcException {try {
//实际就是获取到代理类,然后调用对应的服务方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
//把返回结果用CompletableFuture包裹
CompletableFuture<Object> future = wrapWithFuture(value, invocation);
封装AsyncRpcResult
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
future.whenComplete((obj, t) -> {
AppResponse result = new AppResponse();
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
asyncRpcResult.complete(result);
});
return asyncRpcResult;
} catch (InvocationTargetException e) {
if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
主要做了三件事情:
1. 获取代理类调用具体方法得到方法的返回;
2. 封装响应结果到CompletableFuture对象;
如果服务接口返回的就是CompletableFuture对象,则直接返回(Provider端异步执行),否则把服务接口同步返回的结果封装到CompletableFuture返回出去。
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) {if (RpcContext.getContext().isAsyncStarted()) {
return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture();
} elseif (value instanceof CompletableFuture) {
return (CompletableFuture<Object>) value;
}
return CompletableFuture.completedFuture(value);
}
3. 将上一步得到CompletableFuture对象,通过异步通知将结果封装成AsyncRpcResult返回出去。
到这里服务端处理客户请求过程分析完成,接下来看如何将上一步得到的Response响应给客户端。
这里我用一个时序图表示从HeaderExchangeHandler到服务接口最终执行的调用链路:
响应结果给客户端
在上一部分HeaderExchangeHandler.received的接收过程中,我们知道在得到结果后会将其发送给客户端,因此我们从HeaderExchangeChannel.send方法开始分析
1. HeaderExchangeChannel.send
@Overridepublic void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
2. NettyChannel.send
这里通过调用netty的api向channel中异步写入结果。
public void send(Object message, boolean sent) throws RemotingException {// whether the channel is closed
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.writeAndFlush(message);
if (sent) {
// wait timeout ms
timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
接着则会经过NettyServerHandler的处理器以及编码器。NettyServerHandler.write方法将结果发送给客户端,后面的链路比较简单这里不做分析。
服务端的异步总结
1、首先服务端通过Netty接收到请求之后经过解码后派发给业务线程池(DubboServerHandler-ip:port-thread-)。这里是IO线程到业务线程的一次异步。
2、(==如果服务接口返回的是CompletableFuture==)则会异步将CompletableFuture的结果封装到AsyncRpcResult。
3、(==如果服务接口返回的是CompletableFuture==)AsyncRpcResult再异步执行channel.send();
4、channel.send()发送结果也是一次异步。
欲知更多,欢迎访问:silence.work/
以上是 Dubbo 服务端接收请求过程分析 的全部内容, 来源链接: utcz.com/a/29636.html