IO积累03
多路复用器在java中的使用,被封装为selector
publicclassSocketMultiplexingSingleThreadv1{
privateServerSocketChannelserver=null;
privateSelectorselector=null;//linux 多路复用器(select poll epoll kqueue) nginx event{}
intport=9090;
publicvoidinitServer(){
try{
server=ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(newInetSocketAddress(port));
//如果在epoll模型下,open--》 epoll_create -> fd3
selector=Selector.open();// select poll *epoll 优先选择:epoll 但是可以 -D修正
//server 约等于 listen状态的 fd4
/*
register
如果:
select,poll:jvm里开辟一个数组 fd4 放进去
epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN
*/
server.register(selector,SelectionKey.OP_ACCEPT);
}catch(IOExceptione){
e.printStackTrace();
}
}
publicvoidstart(){
initServer();
System.out.println("服务器启动了。。。。。");
try{
while(true){//死循环
Set<SelectionKey>keys=selector.keys();
System.out.println(keys.size()+" size");
//1,调用多路复用器(select,poll or epoll (epoll_wait))
/*
select()是啥意思:
1,select,poll 其实 内核的select(fd4) poll(fd4)
2,epoll: 其实 内核的 epoll_wait()
*, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时
selector.wakeup() 结果返回0
懒加载:
其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用
*/
while(selector.select()>0){
Set<SelectionKey>selectionKeys=selector.selectedKeys();//返回的有状态的fd集合
Iterator<SelectionKey>iter=selectionKeys.iterator();
//so,管你啥多路复用器,你呀只能给我状态,我还得一个一个的去处理他们的R/W。同步好辛苦!!!!!!!!
// NIO 自己对着每一个fd调用系统调用,浪费资源,那么你看,这里是不是调用了一次select方法,知道具体的那些可以R/W了?
//幕兰,是不是很省力?
//我前边可以强调过,socket: listen 通信 R/W
while(iter.hasNext()){
SelectionKeykey=iter.next();
iter.remove();//set 不移除会重复循环处理
if(key.isAcceptable()){
//看代码的时候,这里是重点,如果要去接受一个新的连接
//语义上,accept接受连接且返回新连接的FD对吧?
//那新的FD怎么办?
//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起
//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间
acceptHandler(key);
}elseif(key.isReadable()){
readHandler(key);//连read 还有 write都处理了
//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。
//所以,为什么提出了 IO THREADS
//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的
//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦
}
}
}
}
}catch(IOExceptione){
e.printStackTrace();
}
}
publicvoidacceptHandler(SelectionKeykey){
try{
ServerSocketChannelssc=(ServerSocketChannel)key.channel();
SocketChannelclient=ssc.accept();//目的是调用accept接受客户端 fd7
client.configureBlocking(false);
ByteBufferbuffer=ByteBuffer.allocate(8192);//前边讲过了
//调用了register
/*
select,poll:jvm里开辟一个数组 fd7 放进去
epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN
*/
client.register(selector,SelectionKey.OP_READ,buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:"+client.getRemoteAddress());
System.out.println("-------------------------------------------");
}catch(IOExceptione){
e.printStackTrace();
}
}
publicvoidreadHandler(SelectionKeykey){
SocketChannelclient=(SocketChannel)key.channel();
ByteBufferbuffer=(ByteBuffer)key.attachment();
buffer.clear();
intread=0;
try{
while(true){
read=client.read(buffer);
if(read>0){
buffer.flip();
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
}elseif(read==0){
break;
}else{
client.close();
break;
}
}
}catch(IOExceptione){
e.printStackTrace();
}
}
publicstaticvoidmain(String[]args){
SocketMultiplexingSingleThreadv1service=newSocketMultiplexingSingleThreadv1();
service.start();
}
}
三次握手、四次分手
服务端没有调用close,当客户端断开连接以后,客户端状态显示fin_wait,服务端显示close_wait,原因是客户端发送了一个fin断开连接的消息,服务端返回了一个fin_ack的确认款凯连接的消息,但是服务端还需要发送一个fin断开连消息,因为没有调用close,所以服务端不会发送fin消息;
服务端调用close,四次分手都完成了,客户端还会保存一会为服务端开辟的内存空间,所以客户端会显示time_wait,
原因是第三次服务端发送fin消息,客户端要返回fin_ack消息,可能会发送失败,所以要保留一会资源
缺点是消耗socket连接也就是四元组
publicclassSocketMultiplexingThreads{
privateServerSocketChannelserver=null;
privateSelectorselector1=null;
privateSelectorselector2=null;
privateSelectorselector3=null;
intport=9090;
publicvoidinitServer(){
try{
server=ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(newInetSocketAddress(port));
selector1=Selector.open();
selector2=Selector.open();
selector3=Selector.open();
server.register(selector1,SelectionKey.OP_ACCEPT);
}catch(IOExceptione){
e.printStackTrace();
}
}
publicstaticvoidmain(String[]args){
SocketMultiplexingThreadsservice=newSocketMultiplexingThreads();
service.initServer();
NioThreadT1=newNioThread(service.selector1,2);
NioThreadT2=newNioThread(service.selector2);
NioThreadT3=newNioThread(service.selector3);
T1.start();
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
T2.start();
T3.start();
System.out.println("服务器启动了。。。。。");
try{
System.in.read();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
classNioThreadextendsThread{
Selectorselector=null;
staticintselectors=0;
intid=0;
volatilestaticBlockingQueue<SocketChannel>[]queue;
staticAtomicIntegeridx=newAtomicInteger();
NioThread(Selectorsel,intn){
this.selector=sel;
this.selectors=n;
queue=newLinkedBlockingQueue[selectors];
for(inti=0;i<n;i++){
queue[i]=newLinkedBlockingQueue<>();
}
System.out.println("Boss 启动");
}
NioThread(Selectorsel){
this.selector=sel;
id=idx.getAndIncrement()%selectors;
System.out.println("worker: "+id+" 启动");
}
@Override
publicvoidrun(){
try{
while(true){
while(selector.select(10)>0){
Set<SelectionKey>selectionKeys=selector.selectedKeys();
Iterator<SelectionKey>iter=selectionKeys.iterator();
while(iter.hasNext()){
SelectionKeykey=iter.next();
iter.remove();
if(key.isAcceptable()){
acceptHandler(key);
}elseif(key.isReadable()){
readHandler(key);
}
}
}
if(!queue[id].isEmpty()){
ByteBufferbuffer=ByteBuffer.allocate(8192);
SocketChannelclient=queue[id].take();
client.register(selector,SelectionKey.OP_READ,buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:"+client.socket().getPort()+"分配到:"+(id));
System.out.println("-------------------------------------------");
}
}
}catch(IOExceptione){
e.printStackTrace();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
publicvoidacceptHandler(SelectionKeykey){
try{
ServerSocketChannelssc=(ServerSocketChannel)key.channel();
SocketChannelclient=ssc.accept();
client.configureBlocking(false);
intnum=idx.getAndIncrement()%selectors;
queue[num].add(client);
}catch(IOExceptione){
e.printStackTrace();
}
}
publicvoidreadHandler(SelectionKeykey){
SocketChannelclient=(SocketChannel)key.channel();
ByteBufferbuffer=(ByteBuffer)key.attachment();
buffer.clear();
intread=0;
try{
while(true){
read=client.read(buffer);
if(read>0){
buffer.flip();
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
}elseif(read==0){
break;
}else{
client.close();
break;
}
}
}catch(IOExceptione){
e.printStackTrace();
}
}
}
以上是 IO积累03 的全部内容, 来源链接: utcz.com/z/518793.html