Java AIO 异步IO应用实例

java

项目地址:https://github.com/windwant/windwant-demo/tree/master/io-service

Server:

package org.windwant.io.aio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousChannelGroup;

import java.nio.channels.AsynchronousServerSocketChannel;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.CompletionHandler;

import java.nio.charset.Charset;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadLocalRandom;

/**

* AsynchronousServerSocketChannel

*/

public class AIOServer implements Runnable{

private int port = 8889;

private int threadSize = 10;

protected AsynchronousChannelGroup asynchronousChannelGroup;

protected AsynchronousServerSocketChannel serverChannel;

public AIOServer(int port, int threadSize) {

this.port = port;

this.threadSize = threadSize;

init();

}

private void init(){

try {

asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);

serverChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);

serverChannel.bind(new InetSocketAddress(port));

System.out.println("listening on port: " + port);

} catch (IOException e) {

e.printStackTrace();

}

}

public void run() {

try{

if(serverChannel == null) return;

serverChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {

final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024);

public void completed(AsynchronousSocketChannel result, AIOServer attachment) {

System.out.println("==============================================================");

System.out.println("server process begin ...");

try {

System.out.println("client host: " + result.getRemoteAddress());

echoBuffer.clear();

result.read(echoBuffer).get();

echoBuffer.flip();

System.out.println("received : " + Charset.defaultCharset().decode(echoBuffer));

int random = ThreadLocalRandom.current().nextInt(5);

printProcess(random);

System.out.println("server deal request execute: " + random + "s");

String msg = "server test msg-" + Math.random();

System.out.println("server send data: " + msg);

result.write(ByteBuffer.wrap(msg.getBytes()));

System.out.println("server process end ...");

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

} finally {

attachment.serverChannel.accept(attachment, this);// 监听新的请求,递归调用。

}

}

public void failed(Throwable exc, AIOServer attachment) {

System.out.println("received failed");

exc.printStackTrace();

attachment.serverChannel.accept(attachment, this);

}

});

System.in.read();

}catch (Exception e){

e.printStackTrace();

}

}

private void printProcess(int s) throws InterruptedException {

String dot = "";

for (int i = 0; i < s; i++) {

Thread.sleep(1000);

dot += ".";

System.out.println(dot);

}

}

public static void main(String[] args) throws IOException {

new Thread(new AIOServer(8989, 19)).start();

}

}

Client:

package org.windwant.aio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.CompletionHandler;

/**

* AsynchronousSocketChannel

*/

public class AIOClient implements Runnable{

private AsynchronousSocketChannel client;

private String host;

private int port;

public AIOClient(String host, int port) throws IOException {

this.client = AsynchronousSocketChannel.open();

this.host = host;

this.port = port;

}

public static void main(String[] args) {

try {

new Thread(new AIOClient("127.0.0.1", 8989)).start();

System.in.read();

} catch (IOException e) {

e.printStackTrace();

}

}

public void run() {

client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {

public void completed(Void result, Object attachment) {

String msg = "client test msg-" + Math.random();

client.write(ByteBuffer.wrap(msg.getBytes()));

System.out.println("client send data:" + msg);

}

public void failed(Throwable exc, Object attachment) {

System.out.println("client send field...");

}

});

final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {

public void completed(Integer result, Object attachment) {

System.out.println(result);

System.out.println("client read data: " + new String(byteBuffer.array()));

}

public void failed(Throwable exc, Object attachment) {

System.out.println("read faield");

}

});

}

}

2017-12-11  改造client: AsynchronousChannelGroup

package org.windwant.io.aio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousChannelGroup;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.CompletionHandler;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

/**

* AsynchronousSocketChannel

*/

public class AIOClient implements Runnable{

private AsynchronousChannelGroup group; //异步通道组 封装处理异步通道的网络IO操作

private String host;

private int port;

public AIOClient(String host, int port) {

this.host = host;

this.port = port;

initGroup();

}

private void initGroup(){

if(group == null) {

try {

group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newFixedThreadPool(5), 5); //使用固定线程池实例化组

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void send(){

try {

//异步流式socket通道 open方法创建 并绑定到组 group

final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);

//连接

client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {

public void completed(Void result, Object attachment) {

String msg = "client test msg-" + Math.random();

client.write(ByteBuffer.wrap(msg.getBytes()));

System.out.println(Thread.currentThread().getName() + " client send data:" + msg);

final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {

public void completed(Integer result, Object attachment) {

System.out.println(Thread.currentThread().getName() + " client read data: " + new String(byteBuffer.array()));

try {

byteBuffer.clear();

if (client != null) client.close();

} catch (IOException e) {

e.printStackTrace();

}

}

public void failed(Throwable exc, Object attachment) {

System.out.println("read faield");

}

});

}

public void failed(Throwable exc, Object attachment) {

System.out.println("client send field...");

}

});

} catch (IOException e) {

e.printStackTrace();

}

}

public void run() {

for (int i = 0; i < 100; i++) {

send();

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

@Override

protected void finalize() throws Throwable {

super.finalize();

group.awaitTermination(10000, TimeUnit.SECONDS);

}

public static void main(String[] args) {

try {

new Thread(new AIOClient("127.0.0.1", 8989)).start();

System.in.read();

} catch (IOException e) {

e.printStackTrace();

}

}

}

以上是 Java AIO 异步IO应用实例 的全部内容, 来源链接: utcz.com/z/391712.html

回到顶部