深入理解NIO
BIO
所谓的BIO 就是最传统的 socket链接嘛,比如:
int port = 4343; //端口号// Socket 服务器端(简单的发送信息)
Thread sThread = new Thread(new Runnable() {
@Override
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(port);
while (true) {
// 等待连接
Socket socket = serverSocket.accept();
Thread sHandlerThread = new Thread(new Runnable() {
@Override
public void run() {
try (PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
printWriter.println("hello world!");
printWriter.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
});
sHandlerThread.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
sThread.start();
// Socket 客户端(接收信息并打印)
try (Socket cSocket = new Socket(InetAddress.getLocalHost(), port)) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(cSocket.getInputStream()));
bufferedReader.lines().forEach(s -> System.out.println("客户端:" + s));
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
流程图大概如下
所以看上会有101个线程, 一个accept线程以及100个链接线程。
这不算复杂。
NIO
实际上我们不太会自己写NIO的代码,而是会使用netty这样优秀的开源库。
图例逻辑表示大概是这样
首先每个客户端都会对应一个SocketChannel的通道(一般通道通过buffer读写数据),然后这些socketchannel会被注册进入selector。
selector相当于一个管理器,他会轮询所有socketchannel,查询所有可用socketchannel,然后去处理这些 socketchannel.
服务端
public class NIOServerSocket { //存储SelectionKey的队列
private static List<SelectionKey> writeQueue = new ArrayList<SelectionKey>();
private static Selector selector = null;
//添加SelectionKey到队列
public static void addWriteQueue(SelectionKey key){
synchronized (writeQueue) {
writeQueue.add(key);
//唤醒主线程
selector.wakeup();
}
}
public static void main(String[] args) throws IOException {
// 1.创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.绑定端口
serverSocketChannel.bind(new InetSocketAddress(60000));
// 3.设置为非阻塞 nio只能使用非阻塞模式
serverSocketChannel.configureBlocking(false);
// 4.创建通道选择器
selector = Selector.open();
/*
* 5.注册事件类型
*
* sel:通道选择器
* ops:事件类型 ==>SelectionKey:包装类,包含事件类型和通道本身。四个常量类型表示四种事件类型
* SelectionKey.OP_ACCEPT 获取报文 SelectionKey.OP_CONNECT 连接
* SelectionKey.OP_READ 读 SelectionKey.OP_WRITE 写
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("服务器端:正在监听60000端口");
// 6.获取可用I/O通道,获得有多少可用的通道
int num = selector.select();
if (num > 0) { // 判断是否存在可用的通道
// 获得所有的keys
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// 使用iterator遍历所有的keys
Iterator<SelectionKey> iterator = selectedKeys.iterator();
// 迭代遍历当前I/O通道
while (iterator.hasNext()) {
// 获得当前key
SelectionKey key = iterator.next();
// 调用iterator的remove()方法,并不是移除当前I/O通道,标识当前I/O通道已经处理。
iterator.remove();
// 判断事件类型,做对应的处理
if (key.isAcceptable()) {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssChannel.accept();
System.out.println("处理请求:"+ socketChannel.getRemoteAddress());
// 获取客户端的数据
// 设置非阻塞状态
socketChannel.configureBlocking(false);
// 注册到selector(通道选择器)
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
System.out.println("读事件");
//取消读事件的监控
key.cancel();
//调用读操作工具类
NIOHandler.read(key);
} else if (key.isWritable()) {
System.out.println("写事件");
//取消读事件的监控
key.cancel();
//调用写操作工具类
NIOHandler.write(key);
}
}
}else{
synchronized (writeQueue) {
while(writeQueue.size() > 0){
SelectionKey key = writeQueue.remove(0);
//注册写事件
SocketChannel channel = (SocketChannel) key.channel();
Object attachment = key.attachment();
channel.register(selector, SelectionKey.OP_WRITE,attachment);
}
}
}
}
}
}
消息处理器,这里会使用多线程来处理消息,线程数量一般和服务器cpu核心数相关,主要目的是为了发挥cpu所有核心的性能。
public class NIOHandler { //构造线程池
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void read(final SelectionKey key){
//获得线程并执行
executorService.submit(new Runnable() {
@Override
public void run() {
try {
SocketChannel readChannel = (SocketChannel) key.channel();
// I/O读数据操作
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int len = 0;
while (true) {
buffer.clear();
len = readChannel.read(buffer);
if (len == -1) break;
buffer.flip();
while (buffer.hasRemaining()) {
baos.write(buffer.get());
}
}
System.out.println("服务器端接收到的数据:"+ new String(baos.toByteArray()));
//将数据添加到key中
key.attach(baos);
//将注册写操作添加到队列中
NIOServerSocket.addWriteQueue(key);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public static void write(final SelectionKey key) {
//拿到线程并执行
executorService.submit(new Runnable() {
@Override
public void run() {
try {
// 写操作
SocketChannel writeChannel = (SocketChannel) key.channel();
//拿到客户端传递的数据
ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();
System.out.println("客户端发送来的数据:"+new String(attachment.toByteArray()));
ByteBuffer buffer = ByteBuffer.allocate(1024);
String message = "你好,我是服务器!!";
buffer.put(message.getBytes());
buffer.flip();
writeChannel.write(buffer);
writeChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
客户端
public class NIOClientSocket { public static void main(String[] args) throws IOException {
//使用线程模拟用户 并发访问
for (int i = 0; i < 1; i++) {
new Thread(){
public void run() {
try {
//1.创建SocketChannel
SocketChannel socketChannel=SocketChannel.open();
//2.连接服务器
socketChannel.connect(new InetSocketAddress("localhost",60000));
//写数据
String msg="我是客户端"+Thread.currentThread().getId();
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put(msg.getBytes());
buffer.flip();
socketChannel.write(buffer);
socketChannel.shutdownOutput();
//读数据
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len = 0;
while (true) {
buffer.clear();
len = socketChannel.read(buffer);
if (len == -1)
break;
buffer.flip();
while (buffer.hasRemaining()) {
bos.write(buffer.get());
}
}
System.out.println("客户端收到:"+new String(bos.toByteArray()));
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
};
}.start();
}
}
}
以上代码中有一段很奇怪的代码,也就是writequeue,这么写的主要原因是
OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
其实,在大部分情况下,我们直接调用channel的write方法写数据就好了,没必要都用OP_WRITE事件。那么OP_WRITE事件主要是在什么情况下使用的了?
其实OP_WRITE事件主要是在发送缓冲区空间满的情况下使用的。如:
while (buffer.hasRemaining()) { int len = socketChannel.write(buffer);
if (len == 0) {
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
selector.wakeup();
break;
}
}
当buffer还有数据,但缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这是我们就可以继续将没写完的数据继续写出了。
而且在写完后,一定要记得将OP_WRITE事件注销:
selectionKey.interestOps(sk.interestOps() & ~SelectionKey.OP_WRITE);
注意,这里在修改了interest之后调用了wakeup();方法是为了唤醒被堵塞的selector方法,这样当while中判断selector返回的是0时,会再次调用selector.select()。而selectionKey的interest是在每次selector.select()操作的时候注册到系统进行监听的,所以在selector.select()调用之后修改的interest需要在下一次selector.select()调用才会生效。
所以对于NIO而言,100个链接并不会有100个线程,而是会有cpu核数+1个线程,或者cpu 核数x2 +1这样
参考
http:///article/265871
https://blog.csdn.net/zxcc1314/article/details/80918665
https://www.jianshu.com/p/1af407c043cb
以上是 深入理解NIO 的全部内容, 来源链接: utcz.com/z/512901.html