深入理解NIO

编程

BIO

所谓的BIO 就是最传统的 socket链接嘛,比如:

int port = 4343; //端口号

// Socket 服务器端(简单的发送信息)

Thread sThread = new Thread(new Runnable() {

@Override

public void run() {

try {

ServerSocket serverSocket = new ServerSocket(port);

while (true) {

// 等待连接

Socket socket = serverSocket.accept();

Thread sHandlerThread = new Thread(new Runnable() {

@Override

public void run() {

try (PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {

printWriter.println("hello world!");

printWriter.flush();

} catch (IOException e) {

e.printStackTrace();

}

}

});

sHandlerThread.start();

}

} catch (IOException e) {

e.printStackTrace();

}

}

});

sThread.start();

// Socket 客户端(接收信息并打印)

try (Socket cSocket = new Socket(InetAddress.getLocalHost(), port)) {

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(cSocket.getInputStream()));

bufferedReader.lines().forEach(s -> System.out.println("客户端:" + s));

} catch (UnknownHostException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

流程图大概如下

所以看上会有101个线程, 一个accept线程以及100个链接线程。

这不算复杂。

NIO

实际上我们不太会自己写NIO的代码,而是会使用netty这样优秀的开源库。

图例逻辑表示大概是这样

首先每个客户端都会对应一个SocketChannel的通道(一般通道通过buffer读写数据),然后这些socketchannel会被注册进入selector。

selector相当于一个管理器,他会轮询所有socketchannel,查询所有可用socketchannel,然后去处理这些 socketchannel.

服务端

public class NIOServerSocket {

//存储SelectionKey的队列

private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();

private static Selector selector = null;

//添加SelectionKey到队列

public static void addWriteQueue(SelectionKey key){

synchronized (writeQueue) {

writeQueue.add(key);

//唤醒主线程

selector.wakeup();

}

}

public static void main(String[] args) throws IOException {

// 1.创建ServerSocketChannel

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 2.绑定端口

serverSocketChannel.bind(new InetSocketAddress(60000));

// 3.设置为非阻塞 nio只能使用非阻塞模式

serverSocketChannel.configureBlocking(false);

// 4.创建通道选择器

selector = Selector.open();

/*

* 5.注册事件类型

*

* sel:通道选择器

* ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型

* SelectionKey.OP_ACCEPT 获取报文 SelectionKey.OP_CONNECT 连接

* SelectionKey.OP_READ 读 SelectionKey.OP_WRITE 写

*/

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {

System.out.println("服务器端:正在监听60000端口");

// 6.获取可用I/O通道,获得有多少可用的通道

int num = selector.select();

if (num > 0) { // 判断是否存在可用的通道

// 获得所有的keys

Set<SelectionKey> selectedKeys = selector.selectedKeys();

// 使用iterator遍历所有的keys

Iterator<SelectionKey> iterator = selectedKeys.iterator();

// 迭代遍历当前I/O通道

while (iterator.hasNext()) {

// 获得当前key

SelectionKey key = iterator.next();

// 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。

iterator.remove();

// 判断事件类型,做对应的处理

if (key.isAcceptable()) {

ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();

SocketChannel socketChannel = ssChannel.accept();

System.out.println("处理请求:"+ socketChannel.getRemoteAddress());

// 获取客户端的数据

// 设置非阻塞状态

socketChannel.configureBlocking(false);

// 注册到selector(通道选择器)

socketChannel.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable()) {

System.out.println("读事件");

//取消读事件的监控

key.cancel();

//调用读操作工具类

NIOHandler.read(key);

} else if (key.isWritable()) {

System.out.println("写事件");

//取消读事件的监控

key.cancel();

//调用写操作工具类

NIOHandler.write(key);

}

}

}else{

synchronized (writeQueue) {

while(writeQueue.size() > 0){

SelectionKey key = writeQueue.remove(0);

//注册写事件

SocketChannel channel = (SocketChannel) key.channel();

Object attachment = key.attachment();

channel.register(selector, SelectionKey.OP_WRITE,attachment);

}

}

}

}

}

}

消息处理器,这里会使用多线程来处理消息,线程数量一般和服务器cpu核心数相关,主要目的是为了发挥cpu所有核心的性能。

public class NIOHandler {

//构造线程池

private static ExecutorService executorService = Executors.newFixedThreadPool(10);

public static void read(final SelectionKey key){

//获得线程并执行

executorService.submit(new Runnable() {

@Override

public void run() {

try {

SocketChannel readChannel = (SocketChannel) key.channel();

// I/O读数据操作

ByteBuffer buffer = ByteBuffer.allocate(1024);

ByteArrayOutputStream baos = new ByteArrayOutputStream();

int len = 0;

while (true) {

buffer.clear();

len = readChannel.read(buffer);

if (len == -1) break;

buffer.flip();

while (buffer.hasRemaining()) {

baos.write(buffer.get());

}

}

System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray()));

//将数据添加到key中

key.attach(baos);

//将注册写操作添加到队列中

NIOServerSocket.addWriteQueue(key);

} catch (IOException e) {

e.printStackTrace();

}

}

});

}

public static void write(final SelectionKey key) {

//拿到线程并执行

executorService.submit(new Runnable() {

@Override

public void run() {

try {

// 写操作

SocketChannel writeChannel = (SocketChannel) key.channel();

//拿到客户端传递的数据

ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();

System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray()));

ByteBuffer buffer = ByteBuffer.allocate(1024);

String message = "你好,我是服务器!!";

buffer.put(message.getBytes());

buffer.flip();

writeChannel.write(buffer);

writeChannel.close();

} catch (IOException e) {

e.printStackTrace();

}

}

});

}

}

客户端

public class NIOClientSocket {

public static void main(String[] args) throws IOException {

//使用线程模拟用户 并发访问

for (int i = 0; i < 1; i++) {

new Thread(){

public void run() {

try {

//1.创建SocketChannel

SocketChannel socketChannel=SocketChannel.open();

//2.连接服务器

socketChannel.connect(new InetSocketAddress("localhost",60000));

//写数据

String msg="我是客户端"+Thread.currentThread().getId();

ByteBuffer buffer=ByteBuffer.allocate(1024);

buffer.put(msg.getBytes());

buffer.flip();

socketChannel.write(buffer);

socketChannel.shutdownOutput();

//读数据

ByteArrayOutputStream bos = new ByteArrayOutputStream();

int len = 0;

while (true) {

buffer.clear();

len = socketChannel.read(buffer);

if (len == -1)

break;

buffer.flip();

while (buffer.hasRemaining()) {

bos.write(buffer.get());

}

}

System.out.println("客户端收到:"+new String(bos.toByteArray()));

socketChannel.close();

} catch (IOException e) {

e.printStackTrace();

}

};

}.start();

}

}

}

以上代码中有一段很奇怪的代码,也就是writequeue,这么写的主要原因是

OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。

其实,在大部分情况下,我们直接调用channel的write方法写数据就好了,没必要都用OP_WRITE事件。那么OP_WRITE事件主要是在什么情况下使用的了?

其实OP_WRITE事件主要是在发送缓冲区空间满的情况下使用的。如:

while (buffer.hasRemaining()) {

int len = socketChannel.write(buffer);

if (len == 0) {

selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);

selector.wakeup();

break;

}

}

当buffer还有数据,但缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这是我们就可以继续将没写完的数据继续写出了。

而且在写完后,一定要记得将OP_WRITE事件注销:

selectionKey.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);

注意,这里在修改了interest之后调用了wakeup();方法是为了唤醒被堵塞的selector方法,这样当while中判断selector返回的是0时,会再次调用selector.select()。而selectionKey的interest是在每次selector.select()操作的时候注册到系统进行监听的,所以在selector.select()调用之后修改的interest需要在下一次selector.select()调用才会生效。

所以对于NIO而言,100个链接并不会有100个线程,而是会有cpu核数+1个线程,或者cpu 核数x2 +1这样

参考

http:///article/265871

https://blog.csdn.net/zxcc1314/article/details/80918665

https://www.jianshu.com/p/1af407c043cb

以上是 深入理解NIO 的全部内容, 来源链接: utcz.com/z/512901.html

回到顶部