javanio模型理解
1、tcp信道,具体参数详情参考api
ServerSocketChannel:创建、接收、关闭、读写、阻塞
SocketChannel:创建、连接、关闭、读写、阻塞(测试连接性)
2、Selector:创建、关闭选择器
案例一:
NIOAccepter服务端线程
package com.warehouse.data.nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import javax.imageio.IIOException;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 9:55
**/
public class NIOAcceptor extends Thread {
private Selector selector;
private ServerSocketChannel channel;
private NIOReactorPool reactorPool;
public NIOAcceptor(String name, String host, int port, NIOReactorPool reactorPool) throws IOException {
super(name);
this.reactorPool = reactorPool;
//获取一个管理通道器
this.selector = Selector.open();
//获取一个接受连接socket
this.channel = ServerSocketChannel.open();
//设置非阻塞
this.channel.configureBlocking(false);
//绑定端口
this.channel.bind(new InetSocketAddress(host, port));
//注册事件
this.channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("start NIOAcceptor thread server.");
}
@Override
public void run() {
final Selector selector = this.selector;
//轮询
for (; ; ) {
try {
//阻塞,直到select事件到达
selector.select(1000L);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
try {
for (SelectionKey key : selectionKeySet) {
if (key.isValid() && key.isAcceptable()) {
accept(selector);
}/* else if (key.isValid() && key.isReadable()) {
Processor processor = (Processor) key.attachment();
try{
processor.process(key);
}catch (IOException e){
processor.close();
}
} */else {
key.cancel();
}
}
} finally {
selectionKeySet.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void accept(Selector selector) {
SocketChannel socketChannel = null;
try {
System.out.println("accept client success.");
socketChannel = this.channel.accept();
socketChannel.configureBlocking(false);
//单个Reactor线程处理
//SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
//selectionKey.attach(new Processor(socketChannel));
//多个Reactor线程处理
NIOReactor reactor = this.reactorPool.getNextReactor();
reactor.postRegister(new Processor(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIOReactorPool线程池
package com.warehouse.data.nio;import java.io.IOException;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 11:50
**/
public class NIOReactorPool {
private final NIOReactor[] nioReactors;
private volatile int nextReactor;
public NIOReactorPool(String name, int poolSize) throws IOException {
nioReactors = new NIOReactor[poolSize];
for (int i = 0; i < poolSize; i++) {
NIOReactor nioReactor = new NIOReactor(name + "-" + i);
nioReactors[i] = nioReactor;
nioReactor.startup();
}
}
public NIOReactor getNextReactor() {
int i = ++nextReactor;
if (i > nioReactors.length) {
i = nextReactor = 0;
}
return nioReactors[i];
}
}
NIOReactor线程
package com.warehouse.data.nio;import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 11:51
**/
public final class NIOReactor {
private final String name;
private final RWThread reactorR;
public NIOReactor(String name) throws IOException {
this.name = name;
this.reactorR = new RWThread();
}
public void startup() {
new Thread(reactorR, this.name + "-RW").start();
}
public void postRegister(Processor processor) {
this.reactorR.registerQueue.offer(processor);
this.reactorR.selector.wakeup();
}
private final class RWThread extends Thread {
private final Selector selector;
private final ConcurrentLinkedQueue<Processor> registerQueue;
public RWThread() throws IOException {
this.selector = Selector.open();
this.registerQueue = new ConcurrentLinkedQueue<Processor>();
}
@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> selectionKeySet = null;
for (; ; ) {
try {
selector.select(1000L);
register(selector);
selectionKeySet = selector.selectedKeys();
for (SelectionKey key : selectionKeySet) {
Object att = key.attachment();
Processor processor = null;
try {
if (att != null && key.isValid()) {
processor = (Processor) att;
if (key.isReadable()) {
processor.process(key);
}
if (key.isWritable()) {
}
} else {
key.channel();
}
} catch (Throwable e) {
e.printStackTrace();
if(processor != null){
processor.close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (selectionKeySet != null) {
selectionKeySet.clear();
}
}
}
}
private void register(Selector selector) {
if (this.registerQueue.isEmpty()) {
return;
}
Processor processor = null;
while ((processor = this.registerQueue.poll()) != null) {
try {
processor.register(selector);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
}
}
Processor事件处理
package com.warehouse.data.nio;import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 10:49
**/
public class Processor {
private SocketChannel channel;
private SelectionKey selectionKey;
public Processor(SocketChannel channel) {
this.channel = channel;
}
public void register(Selector selector) throws ClosedChannelException {
selectionKey = this.channel.register(selector, SelectionKey.OP_READ, this);
}
public void process(SelectionKey key) throws IOException {
//可以采用线程池处理
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
public void close(){
if(this.channel != null){
try {
this.channel.close();
this.selectionKey.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Server服务端
package com.warehouse.data.nio;import java.io.IOException;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 11:03
**/
public class Server {
public static void main(String[] args) throws IOException {
//5个reactor线程
NIOReactorPool nioReactorPool = new NIOReactorPool("NIOReactor-IO", 5);
NIOAcceptor nioAcceptor = new NIOAcceptor("NIOAcceptor-IO", "127.0.0.1", 8888, nioReactorPool);
nioAcceptor.start();
}
}
Client客户端
package com.warehouse.data.nio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* ${DESCRIPTION}
* package com.warehouse.data.nio
*
* @author zli [liz@yyft.com]
* @version v1.0
* @create 2017-03-28 11:00
**/
public class Client {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
if(socketChannel.isConnectionPending()){
//要finishConnect,否则会出现NotYetConnectedException异常
socketChannel.finishConnect();
socketChannel.write(ByteBuffer.wrap(new String("hello world").getBytes()));
}
}
}
资料:具体可以参考netty权威指南
http://www.cnblogs.com/good-temper/p/5003892.html
以上是 javanio模型理解 的全部内容, 来源链接: utcz.com/z/509131.html