IO积累03

编程

多路复用器在java中的使用,被封装为selector

publicclassSocketMultiplexingSingleThreadv1{

privateServerSocketChannelserver=null;

privateSelectorselector=null;//linux 多路复用器(select poll epoll kqueue) nginx event{}

intport=9090;

publicvoidinitServer(){

try{

server=ServerSocketChannel.open();

server.configureBlocking(false);

server.bind(newInetSocketAddress(port));

//如果在epoll模型下,open--》 epoll_create -> fd3

selector=Selector.open();// select poll *epoll 优先选择:epoll 但是可以 -D修正

//server 约等于 listen状态的 fd4

/*

register

如果:

select,poll:jvm里开辟一个数组 fd4 放进去

epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN

*/

server.register(selector,SelectionKey.OP_ACCEPT);

}catch(IOExceptione){

e.printStackTrace();

}

}

publicvoidstart(){

initServer();

System.out.println("服务器启动了。。。。。");

try{

while(true){//死循环

Set<SelectionKey>keys=selector.keys();

System.out.println(keys.size()+" size");

//1,调用多路复用器(select,poll or epoll (epoll_wait))

/*

select()是啥意思:

1,select,poll 其实 内核的select(fd4) poll(fd4)

2,epoll: 其实 内核的 epoll_wait()

*, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时

selector.wakeup() 结果返回0

懒加载:

其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用

*/

while(selector.select()>0){

Set<SelectionKey>selectionKeys=selector.selectedKeys();//返回的有状态的fd集合

Iterator<SelectionKey>iter=selectionKeys.iterator();

//so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!

// NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?

//幕兰,是不是很省力?

//我前边可以强调过,socket: listen 通信 R/W

while(iter.hasNext()){

SelectionKeykey=iter.next();

iter.remove();//set 不移除会重复循环处理

if(key.isAcceptable()){

//看代码的时候,这里是重点,如果要去接受一个新的连接

//语义上,accept接受连接且返回新连接的FD对吧?

//那新的FD怎么办?

//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起

//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间

acceptHandler(key);

}elseif(key.isReadable()){

readHandler(key);//连read 还有 write都处理了

//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。

//所以,为什么提出了 IO THREADS

//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的

//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦

}

}

}

}

}catch(IOExceptione){

e.printStackTrace();

}

}

publicvoidacceptHandler(SelectionKeykey){

try{

ServerSocketChannelssc=(ServerSocketChannel)key.channel();

SocketChannelclient=ssc.accept();//目的是调用accept接受客户端 fd7

client.configureBlocking(false);

ByteBufferbuffer=ByteBuffer.allocate(8192);//前边讲过了

//调用了register

/*

select,poll:jvm里开辟一个数组 fd7 放进去

epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN

*/

client.register(selector,SelectionKey.OP_READ,buffer);

System.out.println("-------------------------------------------");

System.out.println("新客户端:"+client.getRemoteAddress());

System.out.println("-------------------------------------------");

}catch(IOExceptione){

e.printStackTrace();

}

}

publicvoidreadHandler(SelectionKeykey){

SocketChannelclient=(SocketChannel)key.channel();

ByteBufferbuffer=(ByteBuffer)key.attachment();

buffer.clear();

intread=0;

try{

while(true){

read=client.read(buffer);

if(read>0){

buffer.flip();

while(buffer.hasRemaining()){

client.write(buffer);

}

buffer.clear();

}elseif(read==0){

break;

}else{

client.close();

break;

}

}

}catch(IOExceptione){

e.printStackTrace();

}

}

publicstaticvoidmain(String[]args){

SocketMultiplexingSingleThreadv1service=newSocketMultiplexingSingleThreadv1();

service.start();

}

}

三次握手、四次分手
服务端没有调用close,当客户端断开连接以后,客户端状态显示fin_wait,服务端显示close_wait,原因是客户端发送了一个fin断开连接的消息,服务端返回了一个fin_ack的确认款凯连接的消息,但是服务端还需要发送一个fin断开连消息,因为没有调用close,所以服务端不会发送fin消息;

服务端调用close,四次分手都完成了,客户端还会保存一会为服务端开辟的内存空间,所以客户端会显示time_wait,
原因是第三次服务端发送fin消息,客户端要返回fin_ack消息,可能会发送失败,所以要保留一会资源

缺点是消耗socket连接也就是四元组
 

publicclassSocketMultiplexingThreads{

privateServerSocketChannelserver=null;

privateSelectorselector1=null;

privateSelectorselector2=null;

privateSelectorselector3=null;

intport=9090;

publicvoidinitServer(){

try{

server=ServerSocketChannel.open();

server.configureBlocking(false);

server.bind(newInetSocketAddress(port));

selector1=Selector.open();

selector2=Selector.open();

selector3=Selector.open();

server.register(selector1,SelectionKey.OP_ACCEPT);

}catch(IOExceptione){

e.printStackTrace();

}

}

publicstaticvoidmain(String[]args){

SocketMultiplexingThreadsservice=newSocketMultiplexingThreads();

service.initServer();

NioThreadT1=newNioThread(service.selector1,2);

NioThreadT2=newNioThread(service.selector2);

NioThreadT3=newNioThread(service.selector3);

T1.start();

try{

Thread.sleep(1000);

}catch(InterruptedExceptione){

e.printStackTrace();

}

T2.start();

T3.start();

System.out.println("服务器启动了。。。。。");

try{

System.in.read();

}catch(IOExceptione){

e.printStackTrace();

}

}

}

classNioThreadextendsThread{

Selectorselector=null;

staticintselectors=0;

intid=0;

volatilestaticBlockingQueue<SocketChannel>[]queue;

staticAtomicIntegeridx=newAtomicInteger();

NioThread(Selectorsel,intn){

this.selector=sel;

this.selectors=n;

queue=newLinkedBlockingQueue[selectors];

for(inti=0;i<n;i++){

queue[i]=newLinkedBlockingQueue<>();

}

System.out.println("Boss 启动");

}

NioThread(Selectorsel){

this.selector=sel;

id=idx.getAndIncrement()%selectors;

System.out.println("worker: "+id+" 启动");

}

@Override

publicvoidrun(){

try{

while(true){

while(selector.select(10)>0){

Set<SelectionKey>selectionKeys=selector.selectedKeys();

Iterator<SelectionKey>iter=selectionKeys.iterator();

while(iter.hasNext()){

SelectionKeykey=iter.next();

iter.remove();

if(key.isAcceptable()){

acceptHandler(key);

}elseif(key.isReadable()){

readHandler(key);

}

}

}

if(!queue[id].isEmpty()){

ByteBufferbuffer=ByteBuffer.allocate(8192);

SocketChannelclient=queue[id].take();

client.register(selector,SelectionKey.OP_READ,buffer);

System.out.println("-------------------------------------------");

System.out.println("新客户端:"+client.socket().getPort()+"分配到:"+(id));

System.out.println("-------------------------------------------");

}

}

}catch(IOExceptione){

e.printStackTrace();

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

publicvoidacceptHandler(SelectionKeykey){

try{

ServerSocketChannelssc=(ServerSocketChannel)key.channel();

SocketChannelclient=ssc.accept();

client.configureBlocking(false);

intnum=idx.getAndIncrement()%selectors;

queue[num].add(client);

}catch(IOExceptione){

e.printStackTrace();

}

}

publicvoidreadHandler(SelectionKeykey){

SocketChannelclient=(SocketChannel)key.channel();

ByteBufferbuffer=(ByteBuffer)key.attachment();

buffer.clear();

intread=0;

try{

while(true){

read=client.read(buffer);

if(read>0){

buffer.flip();

while(buffer.hasRemaining()){

client.write(buffer);

}

buffer.clear();

}elseif(read==0){

break;

}else{

client.close();

break;

}

}

}catch(IOExceptione){

e.printStackTrace();

}

}

}

以上是 IO积累03 的全部内容, 来源链接: utcz.com/z/518793.html

回到顶部