java网络通信:异步非阻塞I/O (NIO)

java

首先是channel,是一个双向的全双工的通道,可同时读写,而输入输出流都是单工的,要么读要么写。Channel分为两大类,分别是用于网络数据的SelectableChannel和用于文件操作的FileChannel。

注意:在java NIO库中,所有的数据都是用缓冲区处理,常用的是ByteBuffer。

多路复用器Selector:

Selector会不断轮询注册在其上的Channel,如果某个Channel上又新的连接接入、读和写事件,这个Channel就处于就绪状态,通过SelectorKey可以获取就绪Channel的集合。底层使用了epoll()实现,没有最大连接句柄的限制。

服务端代码:

public class TimeServer {

public static void main(String[] args) throws IOException {

int port = 8080;

MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);

new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();

}

}

public class MultiplexerTimeServer implements Runnable {

private Selector selector;

private ServerSocketChannel servChannel;

private volatile boolean stop;

public MultiplexerTimeServer(int port) {

try {

selector = Selector.open();

servChannel = ServerSocketChannel.open();

servChannel.configureBlocking(false);//设置非阻塞模式

servChannel.socket().bind(new InetSocketAddress(port), 1024);

servChannel.register(selector, SelectionKey.OP_ACCEPT);//将Channel注册到selector,监听accept事件

System.out.println("The time server is start in port : " + port);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

public void stop() {

this.stop = true;

}

@Override

public void run() {

while (!stop) {

try {

selector.select(1000);//每隔一秒轮询一次

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedKeys.iterator();

SelectionKey key = null;

while (it.hasNext()) {//如果有新的客户端接入

key = it.next();

it.remove();

try {

handleInput(key);//处理请求

} catch (Exception e) {

if (key != null) {

key.cancel();

if (key.channel() != null)

key.channel().close();

}

}

}

} catch (Throwable t) {

t.printStackTrace();

}

}

// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源

if (selector != null)

try {

selector.close();

} catch (IOException e) {

e.printStackTrace();

}

}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {

// 处理新接入的请求消息

if (key.isAcceptable()) {

// 获取客户端的连接通道

ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

SocketChannel sc = ssc.accept();

sc.configureBlocking(false);

//将新连接注册到selector,并监听读事件

sc.register(selector, SelectionKey.OP_READ);

}

if (key.isReadable()) {

//读取通道数据写入字节缓存

SocketChannel sc = (SocketChannel) key.channel();

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0) {

readBuffer.flip();

byte[] bytes = new byte[readBuffer.remaining()];

readBuffer.get(bytes);//字节缓存写入字节数组

String body = new String(bytes, "UTF-8");

System.out.println("The time server receive order : " + body);

String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)

? new java.util.Date(System.currentTimeMillis()).toString()

: "BAD ORDER";

doWrite(sc, currentTime);//向socketchannel写入数据

} else if (readBytes < 0) {

// 对端链路关闭

key.cancel();

sc.close();

}

}

}

}

private void doWrite(SocketChannel channel, String response)

throws IOException {

if (response != null && response.trim().length() > 0) {

byte[] bytes = response.getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

writeBuffer.put(bytes);

writeBuffer.flip();

channel.write(writeBuffer);

}

}

}

客户端:

public class TimeClient {

public static void main(String[] args) {

int port = 8080;

new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();

}

}

public class TimeClientHandle implements Runnable {

private String host;

private int port;

private Selector selector;

private SocketChannel socketChannel;

private volatile boolean stop;

public TimeClientHandle(String host, int port) {

this.host = host == null ? "127.0.0.1" : host;

this.port = port;

try {

selector = Selector.open();

socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

@Override

public void run() {

try {

doConnect();

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

while (!stop) {

try {

selector.select(1000);

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectedKeys.iterator();

SelectionKey key = null;

while (it.hasNext()) {

key = it.next();

it.remove();

try {

handleInput(key);

} catch (Exception e) {

if (key != null) {

key.cancel();

if (key.channel() != null)

key.channel().close();

}

}

}

} catch (Exception e) {

e.printStackTrace();

System.exit(1);

}

}

// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源

if (selector != null)

try {

selector.close();

} catch (IOException e) {

e.printStackTrace();

}

}

private void doConnect() throws IOException {

// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答

if (!socketChannel.connect(new InetSocketAddress(host, port))) {

socketChannel.register(selector, SelectionKey.OP_READ);

doWrite(socketChannel);

} else//没有直接连接成功,不代表失败,而是说明服务器还没有返回TCP握手的应答消息,所以注册OP_CONNECT事件,监听消息。

socketChannel.register(selector, SelectionKey.OP_CONNECT); }

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {

// 判断是否连接成功

SocketChannel sc = (SocketChannel) key.channel();

if (key.isConnectable()) {//判断是否有连接事件,是的话,说明未连接

if (sc.finishConnect()) {//再连接

sc.register(selector, SelectionKey.OP_READ);

doWrite(sc);

} else

System.exit(1);// 连接失败,进程退出

}

if (key.isReadable()) {

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int readBytes = sc.read(readBuffer);

if (readBytes > 0) {

readBuffer.flip();

byte[] bytes = new byte[readBuffer.remaining()];

readBuffer.get(bytes);

String body = new String(bytes, "UTF-8");

System.out.println("Now is : " + body);

this.stop = true;

} else if (readBytes < 0) {

// 对端链路关闭

key.cancel();

sc.close();

}

}

}

}

private void doWrite(SocketChannel sc) throws IOException {

byte[] req = "QUERY TIME ORDER".getBytes();

ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);

writeBuffer.put(req);

writeBuffer.flip();

sc.write(writeBuffer);

if (!writeBuffer.hasRemaining())

System.out.println("Send order 2 server succeed.");

}

}

以上是 java网络通信:异步非阻塞I/O (NIO) 的全部内容, 来源链接: utcz.com/z/393609.html

回到顶部