Reactor模型的JavaNIO实现

编程

  • 单线程模型
  • 单Reactor多线程模型
  • 主从Reactor多线程模型。

单线程模型

Reactor单线程模型,指的是所有的IO操作都在同一个线程上面完成,线程的职责如下:

  • 作为NIO服务端,接收客户端的TCP连接;

  • 作为NIO客户端,向服务端发起TCP连接;

  • 读取通信对端的请求或者应答消息;

  • 向通信对端发送消息请求或者应答消息。

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息解码。用户线程可以通过消息编码通过NIO线程将消息发送给客户端。

Server端

public class Reactor1 {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.socket().bind(new InetSocketAddress(1234));

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (selector.select() > 0) {

Set<SelectionKey> keys = selector.selectedKeys();

Iterator<SelectionKey> iterator = keys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

if (key.isAcceptable()) {

ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();

SocketChannel socketChannel = acceptServerSocketChannel.accept();

socketChannel.configureBlocking(false);

System.out.println("accept from "+socketChannel.socket().getInetAddress().toString());

// LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress());

socketChannel.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable() && key.isValid()) {

SocketChannel socketChannel = (SocketChannel) key.channel();

ByteBuffer buffer = ByteBuffer.allocate(1024);

int count = socketChannel.read(buffer);

if (count <= 0) {

socketChannel.close();

key.cancel();

System.out.println("Received invalide data, close the connection");

//LOGGER.info("Received invalide data, close the connection");

continue;

}

System.out.println("Received message"+new String(buffer.array()));

//LOGGER.info("Received message {}", new String(buffer.array()));

}

keys.remove(key);

}

}

}

}

Client

public class Client1 {

public static void main(String[] args) throws IOException, InterruptedException {

SocketChannel socketChannel;

socketChannel = SocketChannel.open();

//socketChannel.configureBlocking(false);

socketChannel.connect(new InetSocketAddress("localhost", 1234));

Date now = new Date();

SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");//可以方便地修改日期格式

String str = dateFormat.format( now );

byte[] requst = str.getBytes();

ByteBuffer buffer = ByteBuffer.allocate(requst.length);

buffer.put(requst);

buffer.flip();

try {

while (buffer.hasRemaining()) {

socketChannel.write(buffer);

}

}catch (IOException e) {

e.printStackTrace();

}

socketChannel.close();

}

}

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;

当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

为了解决这些问题,演进出了Reactor多线程模型。

单Reactor多线程模型

经典Reactor模式中,尽管一个线程可同时监控多个请求(Channel),但是所有读/写请求以及对新连接请求的处理都在同一个线程中处理,无法充分利用多CPU的优势,同时读/写操作也会阻塞对新连接请求的处理。当获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

Reactor多线程模型的特点:

  1. 有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;

  2. 网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;

  3. 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;

服务端的实现

public class Reactor2 {

private static ExecutorService pool = Executors.newFixedThreadPool(100);

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.socket().bind(new InetSocketAddress(1234));

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while(true) {

if(selector.selectNow() < 0){

continue;

}

Set<SelectionKey> sets = selector.selectedKeys();

Iterator<SelectionKey> keys = sets.iterator();

while(keys.hasNext()) {

SelectionKey key = keys.next();

keys.remove();

if(key.isAcceptable()) {

ServerSocketChannel Serverchannel = (ServerSocketChannel) key.channel();

SocketChannel channel = Serverchannel.accept();

channel.configureBlocking(false);

System.out.println("accept from "+channel.socket().getInetAddress().toString());

channel.register(selector, SelectionKey.OP_READ);

}else if(key.isValid()&&key.isReadable()) {

pool.submit(new Processor(key));

}

}

}

}

}

class Processor implements Callable {

SelectionKey key;

public Processor(SelectionKey key) {

this.key = key;

}

@Override

public Object call() throws Exception {

ByteBuffer buffer = ByteBuffer.allocate(1024);

SocketChannel socketChannel = (SocketChannel) key.channel();

int count = socketChannel.read(buffer);

if (count < 0) {

key.cancel();

socketChannel.close();

System.out.println("Received invalide data, close the connection");

return null;

}else if(count==0) {

return null;

}

System.out.println("Received message"+new String(buffer.array()));

System.out.println("current thread"+Thread.currentThread().toString());

return null;

}

}

在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

多个Reactor模式(主从Reactor)

Netty中使用的Reactor模式,引入了多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。

多个Reactor模式架构图

public class MainReactor {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.configureBlocking(false);

serverSocketChannel.socket().bind(new InetSocketAddress(1234));

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

int coreNum = Runtime.getRuntime().availableProcessors();

FollowerReactor[] followers = new FollowerReactor[coreNum];

for(int i=0; i<coreNum; i++) {

followers[i] = new FollowerReactor();

}

int index = 0;

while(selector.select()>0) {

Set<SelectionKey> keys = selector.selectedKeys();

for(SelectionKey key:keys) {

keys.remove(key);

if(key.isValid()&&key.isAcceptable()) {

ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();

SocketChannel socketChannel = serverSocketChannel1.accept();

socketChannel.configureBlocking(false);

System.out.println("Accept request:" + socketChannel.socket().getInetAddress());

FollowerReactor follower = followers[++index%coreNum];

follower.register(socketChannel);

//follower.wakeUp();

}

}

}

}

}

上面的代码是主Reactor,子Reactor根据前机器可用核数的两倍(与Netty默认的子Reactor个数一致)。对于每个成功连接的SocketChannel,通过round robin的方式交给不同的子Reactor。子Reactor的代码如下:

public class FollowerReactor {

private Selector selector;

private static ExecutorService service =Executors.newFixedThreadPool(

2*Runtime.getRuntime().availableProcessors());

public void register(SocketChannel socketChannel) throws ClosedChannelException {

socketChannel.register(selector, SelectionKey.OP_READ);

}

public void wakeUp() {

}

public FollowerReactor() throws IOException {

selector = Selector.open();

select();

}

public void wakeup() {

this.selector.wakeup();

}

public void select() {

service.submit(() -> {

while(true) {

if(selector.select(500)<=0) {

continue;

}

Set<SelectionKey> keys = selector.selectedKeys();

Iterator<SelectionKey> iterator = keys.iterator();

while(iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

if(key.isReadable()) {

ByteBuffer buffer = ByteBuffer.allocate(1024);

SocketChannel channel = (SocketChannel) key.channel();

int count = channel.read(buffer);

if(count<0) {

channel.close();

key.cancel();

System.out.println(channel+"->red end !");

continue;

}else if(count==0) {

System.out.println(channel+",size is 0 !");

continue;

}else{

System.out.println(channel+",message is :"+new String(buffer.array()));

}

}

}

}

});

}

}

在子Reactor中创建了一个静态的线程池,且线程池的大小为机器核数的两倍,每个字Reactor包换一个Selector实例,同事每次创建一个子Reactor都提交一个任务到线程池,阻塞到selector方法,直到新的channel注册到该Selector上,才继续执行。

参考地址

  • https://blog.csdn.net/TheLudlows/article/details/81136359

如果大家喜欢我的文章,可以关注个人订阅号。欢迎随时留言、交流。如果想加入微信群的话一起讨论的话,请加管理员简栈文化-小助手(lastpass4u),他会拉你们进群。

以上是 Reactor模型的JavaNIO实现 的全部内容, 来源链接: utcz.com/z/513988.html

回到顶部