多路复用器支持高并发demo
public static void main(String[] args) {
//只接受注册
SelectorThreadGroup boss = new SelectorThreadGroup(3);
//接受客户端的读写
SelectorThreadGroup worker = new SelectorThreadGroup(3);
//主中添加子
boss.setWorker(worker);
boss.bind(9999);
boss.bind(8888);
boss.bind(6666);
boss.bind(7777);
}
/**
* 每线程对应一个selector
* 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上
* 注意,每个客户端,只绑定到其中一个selector
* 其实不会有交互问题
*/
public class SelectThread implements Runnable {
Selector selector = null;
LinkedBlockingQueue<Channel> queue = new LinkedBlockingQueue<>();
//标识线程属于哪个组
SelectorThreadGroup _group;
public SelectThread(SelectorThreadGroup group){
try {
//每个线程创建一个多路复用器
selector = Selector.open();
this._group = group;
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true){
try {
/**
* selector.select(); 底层已经调用epoll_wait方法,返回了链表保存在内存中,是有状态的文件描述符
* */
//System.out.println("1 " + Thread.currentThread().getName() + "--" + selector.keys().size());
int num = selector.select(); //阻塞 wakeup()
//System.out.println("2 " + Thread.currentThread().getName() + "--" + selector.keys().size());
//2,处理selectkeys
if(num > 0){
/**
* 获取内存汇总的链表直接使用,因为是文件描述符,
* 所以可以找到相应的channel的状态,根据状态执行相应操作
*
* */
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
//从selector的红黑树移除
iter.remove();
//判断每个文件描述符代表的状态
if(key.isAcceptable()){
acceptHandle(key);
}else if(key.isReadable()){
readHandler(key);
}else if(key.isWritable()){
}
}
}
//3,处理一些task : listen client 被打断后的处理
if(!queue.isEmpty()){
Channel c = queue.take();
if(c instanceof ServerSocketChannel){
ServerSocketChannel ser = (ServerSocketChannel) c;
ser.register(selector,SelectionKey.OP_ACCEPT);
}else if(c instanceof SocketChannel){
SocketChannel cli = (SocketChannel) c;
ByteBuffer buf = ByteBuffer.allocateDirect(4096);
cli.register(selector,SelectionKey.OP_READ,buf);
}
}
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void readHandler(SelectionKey key) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel client = (SocketChannel)key.channel();
buffer.clear();
while (true){
try {
int num = client.read(buffer);
if(num > 0){
buffer.flip();
while (buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
}else if(num == 0){
break;
}else if(num < 0){
//客户端断开了
System.out.println("client : " + client.getRemoteAddress() + " closed..");
key.channel();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void acceptHandle(SelectionKey key) {
/*
* key是文件描述符,标识socket连接
* 因为是服务端,所以使用channel获取ServerSocketChannel类似于ServerSocket
* accept获取当前文件描述符所表示的客户端
* */
ServerSocketChannel server = (ServerSocketChannel) key.channel();
try {
SocketChannel client = server.accept();
client.configureBlocking(false);
//该线程只是接受,读写操作在其他线程,所以需要把客户端注册到其他线程的selector中
_group.nextSelectorV3(client);
} catch (IOException e) {
e.printStackTrace();
}
}
public void setWorket(SelectorThreadGroup stgWorket) {
this._group = stgWorket;
}
}
public class SelectorThreadGroup {
SelectThread[] selects;
ServerSocketChannel server = null;
AtomicInteger xid = new AtomicInteger(0);
//代表工作组,因为工作中中也包含selects,所以工作组中的线程需要使用stg.selects
SelectorThreadGroup stg = this;
public void setWorker(SelectorThreadGroup stg){
this.stg = stg;
}
//num线程数
public SelectorThreadGroup(int num){
//线程数组,每个线程中有一个多路复用器
selects = new SelectThread[num];
for(int i = 0;i < num;i++){
//每个线程属于哪个组
selects[i] = new SelectThread(this);
new Thread(selects[i]).start();
}
}
//服务端绑定端口
public void bind(int port) {
try {
//nio服务端绑定端口
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
//使用多路复用器,需要把server注册到多路复用器中,接收客户端请求交给多路复用器,如accetp
nextSelectorV3(server);
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextSelector(Channel c){
SelectThread st = next();
st.queue.add(c);
st.selector.wakeup();
}
public void nextSelectorV3(Channel c){
//如果是boos组,那么只需要去除boos中的线程,worket组的线程,需要使用工作组,然后在获取
if(c instanceof ServerSocketChannel){
SelectThread st = next();
st.queue.add(c);
st.setWorket(stg);
st.selector.wakeup();
}else if(c instanceof SocketChannel){
SelectThread st = nextV3();
//1,通过队列传递数据 消息
st.queue.add(c);
//2,通过打断阻塞,让对应的线程去自己在打断后完成注册selector
st.selector.wakeup();
}
SelectThread st = nextV3();
st.queue.add(c);
st.selector.wakeup();
}
private SelectThread next(){
int index = xid.incrementAndGet() % selects.length;
return selects[index];
}
private SelectThread nextV3(){
int index = xid.incrementAndGet() % stg.selects.length;
return stg.selects[index];
}
}
以上是 多路复用器支持高并发demo 的全部内容, 来源链接: utcz.com/z/518894.html