javanio模型理解

coding

1、tcp信道,具体参数详情参考api

ServerSocketChannel:创建、接收、关闭、读写、阻塞

SocketChannel:创建、连接、关闭、读写、阻塞(测试连接性)

2、Selector:创建、关闭选择器

案例一:

NIOAccepter服务端线程

package com.warehouse.data.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Set;

import javax.imageio.IIOException;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 9:55

**/

public class NIOAcceptor extends Thread {

private Selector selector;

private ServerSocketChannel channel;

private NIOReactorPool reactorPool;

public NIOAcceptor(String name, String host, int port, NIOReactorPool reactorPool) throws IOException {

super(name);

this.reactorPool = reactorPool;

//获取一个管理通道器

this.selector = Selector.open();

//获取一个接受连接socket

this.channel = ServerSocketChannel.open();

//设置非阻塞

this.channel.configureBlocking(false);

//绑定端口

this.channel.bind(new InetSocketAddress(host, port));

//注册事件

this.channel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("start NIOAcceptor thread server.");

}

@Override

public void run() {

final Selector selector = this.selector;

//轮询

for (; ; ) {

try {

//阻塞,直到select事件到达

selector.select(1000L);

Set<SelectionKey> selectionKeySet = selector.selectedKeys();

try {

for (SelectionKey key : selectionKeySet) {

if (key.isValid() && key.isAcceptable()) {

accept(selector);

}/* else if (key.isValid() && key.isReadable()) {

Processor processor = (Processor) key.attachment();

try{

processor.process(key);

}catch (IOException e){

processor.close();

}

} */else {

key.cancel();

}

}

} finally {

selectionKeySet.clear();

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

public void accept(Selector selector) {

SocketChannel socketChannel = null;

try {

System.out.println("accept client success.");

socketChannel = this.channel.accept();

socketChannel.configureBlocking(false);

//单个Reactor线程处理

//SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);

//selectionKey.attach(new Processor(socketChannel));

//多个Reactor线程处理

NIOReactor reactor = this.reactorPool.getNextReactor();

reactor.postRegister(new Processor(socketChannel));

} catch (IOException e) {

e.printStackTrace();

}

}

}

NIOReactorPool线程池

package com.warehouse.data.nio;

import java.io.IOException;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 11:50

**/

public class NIOReactorPool {

private final NIOReactor[] nioReactors;

private volatile int nextReactor;

public NIOReactorPool(String name, int poolSize) throws IOException {

nioReactors = new NIOReactor[poolSize];

for (int i = 0; i < poolSize; i++) {

NIOReactor nioReactor = new NIOReactor(name + "-" + i);

nioReactors[i] = nioReactor;

nioReactor.startup();

}

}

public NIOReactor getNextReactor() {

int i = ++nextReactor;

if (i > nioReactors.length) {

i = nextReactor = 0;

}

return nioReactors[i];

}

}

NIOReactor线程

package com.warehouse.data.nio;

import java.io.IOException;

import java.nio.channels.ClosedChannelException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.util.Set;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 11:51

**/

public final class NIOReactor {

private final String name;

private final RWThread reactorR;

public NIOReactor(String name) throws IOException {

this.name = name;

this.reactorR = new RWThread();

}

public void startup() {

new Thread(reactorR, this.name + "-RW").start();

}

public void postRegister(Processor processor) {

this.reactorR.registerQueue.offer(processor);

this.reactorR.selector.wakeup();

}

private final class RWThread extends Thread {

private final Selector selector;

private final ConcurrentLinkedQueue<Processor> registerQueue;

public RWThread() throws IOException {

this.selector = Selector.open();

this.registerQueue = new ConcurrentLinkedQueue<Processor>();

}

@Override

public void run() {

final Selector selector = this.selector;

Set<SelectionKey> selectionKeySet = null;

for (; ; ) {

try {

selector.select(1000L);

register(selector);

selectionKeySet = selector.selectedKeys();

for (SelectionKey key : selectionKeySet) {

Object att = key.attachment();

Processor processor = null;

try {

if (att != null && key.isValid()) {

processor = (Processor) att;

if (key.isReadable()) {

processor.process(key);

}

if (key.isWritable()) {

}

} else {

key.channel();

}

} catch (Throwable e) {

e.printStackTrace();

if(processor != null){

processor.close();

}

}

}

} catch (IOException e) {

e.printStackTrace();

} finally {

if (selectionKeySet != null) {

selectionKeySet.clear();

}

}

}

}

private void register(Selector selector) {

if (this.registerQueue.isEmpty()) {

return;

}

Processor processor = null;

while ((processor = this.registerQueue.poll()) != null) {

try {

processor.register(selector);

} catch (ClosedChannelException e) {

e.printStackTrace();

}

}

}

}

}

Processor事件处理

package com.warehouse.data.nio;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.ClosedChannelException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 10:49

**/

public class Processor {

private SocketChannel channel;

private SelectionKey selectionKey;

public Processor(SocketChannel channel) {

this.channel = channel;

}

public void register(Selector selector) throws ClosedChannelException {

selectionKey = this.channel.register(selector, SelectionKey.OP_READ, this);

}

public void process(SelectionKey key) throws IOException {

//可以采用线程池处理

SocketChannel socketChannel = (SocketChannel) key.channel();

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

int count = socketChannel.read(byteBuffer);

System.out.println(new String(byteBuffer.array()));

}

public void close(){

if(this.channel != null){

try {

this.channel.close();

this.selectionKey.cancel();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

Server服务端

package com.warehouse.data.nio;

import java.io.IOException;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 11:03

**/

public class Server {

public static void main(String[] args) throws IOException {

//5个reactor线程

NIOReactorPool nioReactorPool = new NIOReactorPool("NIOReactor-IO", 5);

NIOAcceptor nioAcceptor = new NIOAcceptor("NIOAcceptor-IO", "127.0.0.1", 8888, nioReactorPool);

nioAcceptor.start();

}

}

Client客户端

package com.warehouse.data.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

/**

* ${DESCRIPTION}

* package com.warehouse.data.nio

*

* @author zli [liz@yyft.com]

* @version v1.0

* @create 2017-03-28 11:00

**/

public class Client {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

SocketChannel socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

socketChannel.register(selector, SelectionKey.OP_CONNECT);

socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));

if(socketChannel.isConnectionPending()){

//要finishConnect,否则会出现NotYetConnectedException异常

socketChannel.finishConnect();

socketChannel.write(ByteBuffer.wrap(new String("hello world").getBytes()));

}

}

}

资料:具体可以参考netty权威指南

http://www.cnblogs.com/good-temper/p/5003892.html

以上是 javanio模型理解 的全部内容, 来源链接: utcz.com/z/509131.html

回到顶部