Java学习笔记15Netty职责链Pipeline详解

编程

Java学习笔记15-Netty职责链Pipeline详解

设计模式-职责链模式

职责链模式(Chain of Responsibility Pattern)为请求创建了一个处理对象的链。

发起请求和具体处理请求的过程进行解耦:职责链上的处理者负责处理请求,客户只需要将请求发送到职责链

上即可,无需关心请求的处理细节和请求传递。

请求 --> 职责链调用 --chains--> handler-1 --> handler-2 --> handler-3 --> handler-n --> …

实现职责链模式

实现职责链模式4个要素:处理器抽象类、具体的处理器实现类、保存处理器信息、处理执行

// -----集合形式存储-----伪代码---类似tomcat中filters

// 处理器抽象类

class AbstractHandler {void doHandler(Object arg0)}

// 处理器具体实现类

class Handler1 extends AbstractHandler {assert continue;}

class Handler2 extends AbstractHandler {assert continue;}

class Handler3 extends AbstractHandler {assert continue;}

// 创建集合并存储所有处理器实例信息

List handlers = new List();

handlers.add(handler1,handler2,handler3);

// 处理请求,调用处理器()

void process(request){

for(handler in handlers){

handler.doHandler(request);

}

}

// 发起请求调用,通过责任链处理请求

call.process(request);

// -----链表形式存储-----伪代码---netty就是这种形式

// 处理器抽象类

class AbstractHandler {

AbstractHandler next;//下一个节点

void doHandler(Object arg0);//handler方法

}

// 处理器具体实现类

class Handler1 extends AbstractHandler {assert continue;}

class Handler2 extends AbstractHandler {assert continue;}

class Handler3 extends AbstractHandler {assert continue;}

// 将处理器串成链表存储

pipeline = 头[handler1->handler2->handler3]尾

// 处理请求,调用处理器(从头到尾)

void process(request){

handler = pipeline.findOne;//查找第一个

while (handler!=null){

handler.doHandler(request);

handler = handler.next();

}

}

链表形式测试代码

/**

* @Author: Wenx

* @Description:

* @Date: Created in 2019/11/23 11:38

* @Modified By:

*/

public class MyPipeline {

public static void main(String[] args) {

LinkedHandlerContext handlerContext = new LinkedHandlerContext();

handlerContext.addLast(new Handler1());

handlerContext.addLast(new Handler2());

handlerContext.addLast(new Handler1());

handlerContext.addLast(new Handler2());

// 发起请求

handlerContext.process(0);

}

}

/**

* 链表式处理器上下文,主要负责链表的维护,链表的执行

*/

class LinkedHandlerContext {

/**

* 链表头节点

*/

AbstractLinkedHandler head;

/**

* 添加处理器至末尾

*

* @param handler 处理器

*/

public void addLast(AbstractLinkedHandler handler) {

if (head == null) {

head = handler;

return;

}

AbstractLinkedHandler context = head;

while (context.next != null) {

context = context.next;

}

context.next = handler;

}

/**

* 执行处理过程

*

* @param request 请求参数

*/

public void process(Object request) {

head.handle(this, request);

}

/**

* 执行下一个处理方法

*/

public void processNext(AbstractLinkedHandler handler, Object param) {

if (handler.next != null) {

handler.next.handle(this, param);

}

}

}

/**

* 链表式处理器抽象类

*/

abstract class AbstractLinkedHandler {

/**

* 下一个节点

*/

AbstractLinkedHandler next;

/**

* 处理方法

*

* @param context 处理器上下文

* @param param 请求参数

*/

abstract void handle(LinkedHandlerContext context, Object param);

}

/**

* 处理器具体实现类

*/

class Handler1 extends AbstractLinkedHandler {

@Override

void handle(LinkedHandlerContext context, Object param) {

param = (Integer) param + 10;

System.out.println("打败野猪获得10经验,当前经验值:" + param);

context.processNext(this, param);

}

}

/**

* 处理器具体实现类

*/

class Handler2 extends AbstractLinkedHandler {

@Override

void handle(LinkedHandlerContext context, Object param) {

param = (Integer) param + 100;

System.out.println("打败恶龙获得100经验,当前经验值:" + param);

context.processNext(this, param);

}

}

Netty中的ChannelPipeline职责链

Pipeline管道保存了通道所有处理器信息。创建新channel时自动创建一个专有的pipeline。入站事件和出站操作会调用pipeline上的处理器。

Channel的I/O请求或者ChannelHandlerContext操作

ChannelPipeline

出站事件:出站处理器自顶向下的处理

Inbound Handler N

Outbound Handler 1

Inbound Handler N-1

Outbound Handler 2

ChannelHandlerContext.IN_EVT()方法调用

ChannelHandlerContext.OUT_EVT()方法调用

Inbound Handler 2

Outbound Handler N-1

Inbound Handler 1

Outbound Handler N

Socket.read()

Socket.write()

入站事件:入站处理器自底向上的处理

入站事件和出站事件

入站事件:通常指I/O线程生成了入站数据。

(通俗理解:从socket底层自己往上冒上来的事件都是入站)

比如EventLoop收到selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer)接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用。

出站事件:通常是指I/O线程执行实际的输出操作。

(通俗理解:想主动往socket底层操作的事件都是出站)

比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的ChannelPipeline中包含的下一个出站处理器中的bind方法被调用。

Netty中事件的定义

inbound

入站事件

outbound

出站事件

fireChannelRegistered

channel注册事件

bind

端口绑定事件

fireChannelUnregistered

channel解除注册事件

connect

连接事件

fireChannelActive

channel活跃事件

disconnect

断开连接事件

fireChannelInactive

channel非活跃事件

close

关闭事件

fireExceptionCaught

异常事件

deregister

解除注册事件

fireUserEventTriggered

用户自定义事件

flush

刷新数据到网络事件

fireChannelRead

channel读事件

read

读事件,用于注册OP_READ到selector

fireChannelReadComplete

channel读完成事件

write

写事件

fireChannelWritabilityChanged

channel写状态变化事件

writeAndFlush

写出数据事件

Pipeline中的handler是什么

ChannelHandler:用于处理I/O事件或拦截I/O操作,并转发到ChannelPipeline中的下一个处理器。

这个顶级接口定义功能很弱,实际使用时会去实现下面两大子接口:

处理入站I/O事件的ChannelInboundHandler、处理出站I/O操作的ChannelOutboundHandler

适配器类:为了开发方便,避免所有handler去实现一边接口方法,Netty提供了简单的实现类:

ChannelInboundHandlerAdapter 处理入站I/O事件

ChannelOutboundHandlerAdapter 来处理出站I/O操作

ChannelDuplexHandler 来支持同时处理入站和出站事件

ChannelHandlerContext:实际存储在Pipeline中的对象并非ChannelHandler,而是上下文对象。

将handler包裹在上下文对象中,通过上下文对象对它所属的ChannelPipeline交互,向上或向下传递事件或者修改pipeline都是通过上下文对象。

维护Pipeline中的handler

ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除。

例如,你可以在即将交换敏感信息时插入加密处理程序,并在交换后删除它。

一般操作,初始化的时候添加进去,较少删除。下面是Pipeline中管理handler的API

方法名称

描述

addFirst

最前面插入

addLast

最后面插入

addBefore

插入到指定处理器前面

addAfter

插入到指定处理器后面

remove

移除指定处理器

removeFirst

移除第一个处理器

removeLast

移除最后一个处理器

replace

替换指定的处理器

// 示例伪代码

ChannelPipeline p=…;

p.addLast("1", new InboundHandlerA());

p.addLast("2", new InboundHandlerB());

p.addLast("3", new OutboundHandlerA());

p.addLast("4", new OutboundHandlerB());

p.addLast("5", new InboundOutboundHandlerX());

handler的执行分析

当前的ChannelPipeline

ChannelDuplexHandler 5

Outbound Handler 4

Outbound Handler 3

Inbound Handler 2

Inbound Handler 1

当入站事件时,执行顺序是1、2、3、4、5

当出站事件时,执行顺序是5、4、3、2、1

在这一原则之上,ChannelPipeline在执行时会进行选择

3和4为出站处理器,因此入站事件的实际执行是1、2、5

1和2为入站处理器,因此出站事件的实际执行是5、4、3

不同的入站事件会触发handler不同的方法执行:

上下文对象中fire**开头的方法,代表入站事件传播和处理

其余的方法代表出站事件的传播和处理。

分析registered入站事件的处理

过程

描述

bind(端口):AbstractBootstrap↓

创建和初始化Channel↓

通道创建时构建一个pipeline,头尾分别是HeadContext……TailContext;init() 增加了一个ChannelInitializer,这个handler用于通道初始化,我们自己的相关初始化定义都是通过它执行的

注册到EventLoop中的Selector上

config().group().register() registered成功之后,触发ChannelInitializer.channelRegistered;初始化Handler执行一次之后,会把自己从pipeline中删除掉

ServerSocketChannel.pipeline的变化

过程

描述

新建

TailContext、HeadContext

初始化后

TailContext、ChannelInitializer、HeadContext

完成注册后

TailContext、ServerBootstrapAcceptor、LoggingHandler、HeadContext

分析bind出站事件的处理

过程1

过程2

bind(端口):AbstractBootstrap↓

创建和初始化Channel↓

注册到EventLoop中的Selector上↓

doBind0() -> channel.bind↓

ServerSocketChannel

pipeline.bind 出站操作→

完成注册后的ChannelPipeline

TailContext↓

ServerBootstrapAcceptor↓

LoggingHandler↓

HeadContext↓

HandlerContext.bind↓

AbstractUnsafe.bind↓

NioServerSocketChannel.doBind

分析accept入站事件的处理

执行过程

Netty4.1.44.Final源码位置

EventLoop轮询到accept事件 ↓

NioEventLoop:processSelectedKeys():550行;processSelectedKey():653行

NioMessageUnsafe.read ↓

AbstractNioMessageChannel:read():63行;doReadMessages:75行

NioServerSocketChannel的accept获取新链接通道 ↓

NioServerSocketChannel:doReadMessages():146行

遍历新链接传播fireChannelRead事件↓

AbstractNioMessageChannel:fireChannelRead():93行

入站操作自底向上↓

完成注册后的ChannelPipeline

↑TailContext

↑ServerBootstrapAcceptor→

为新链接通道选择EventLoop注册:ServerBootstrap#ServerBootstrapAcceptor:channelRead():201行

↑LoggingHandler

↑HeadContext

这是一个分配的过程,main Group负责accept,然后分配sub Group负责read

分析read入站事件的处理

pipeline分析的关键4要素:什么事件、有哪些处理器、哪些会被触发、执行顺序

ServerSocketChannel的ChannelPipeline

EventLoop轮询到op_read事件↓

TailContext↑

NioByteUnsafe.read 读取内容↓

EchoServerHandler↑

fireChannelRead、fireChannelReadComplete 入站操作→

HeadContext↑

小结

用户在管道中有一个或多个channelhandler来接收I/O事件(例如读取)和请求I/O操作(例如写入和关闭)。

一个典型的服务器在每个通道的管道中都有以下处理程序,但是根据协议和业务逻辑的复杂性和特征,可能会有所不同:

  • 协议解码器:将二进制数据(例如ByteBuf)转换为Java对象。
  • 协议编码器:将Java对象转换为二进制数据。
  • 业务逻辑处理程序:执行实际的业务逻辑(例如数据库访问)。

以上是 Java学习笔记15Netty职责链Pipeline详解 的全部内容, 来源链接: utcz.com/z/511193.html

回到顶部