多路复用器支持高并发demo

编程

public static void main(String[] args) {

    //只接受注册
    SelectorThreadGroup boss = new SelectorThreadGroup(3);
    //接受客户端的读写
    SelectorThreadGroup worker = new SelectorThreadGroup(3);

    //主中添加子
    boss.setWorker(worker);

    boss.bind(9999);
    boss.bind(8888);
    boss.bind(6666);
    boss.bind(7777);

}

 

/**
* 每线程对应一个selector
* 多线程情况下,该主机,该程序的并发客户端被分配到多个selector
* 注意,每个客户端,只绑定到其中一个selector
* 其实不会有交互问题
*/
public class SelectThread implements Runnable {

    Selector selector = null;

    LinkedBlockingQueue<Channel> queue = new LinkedBlockingQueue<>();
    //标识线程属于哪个组
    SelectorThreadGroup _group;

    public SelectThread(SelectorThreadGroup group){

        try {

            //每个线程创建一个多路复用器
            selector = Selector.open();
            this._group = group;

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {

        while (true){
            try {
                /**
                 * selector.select(); 底层已经调用epoll_wait方法,返回了链表保存在内存中,是有状态的文件描述符
                 * */
                //System.out.println("1    " + Thread.currentThread().getName() + "--" + selector.keys().size());
                int num = selector.select(); //阻塞  wakeup()
                //System.out.println("2    " + Thread.currentThread().getName() + "--" + selector.keys().size());
                //2,处理selectkeys
                if(num > 0){
                    /**
                     * 获取内存汇总的链表直接使用,因为是文件描述符,
                     * 所以可以找到相应的channel的状态,根据状态执行相应操作
                     *
                     * */
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = keys.iterator();
                    while (iter.hasNext()){
                        SelectionKey key = iter.next();
                        //selector的红黑树移除
                        iter.remove();
                        //判断每个文件描述符代表的状态
                        if(key.isAcceptable()){

                            acceptHandle(key);

                        }else if(key.isReadable()){

                            readHandler(key);

                        }else if(key.isWritable()){

                        }
                    }
                }

                //3,处理一些task :  listen  client  被打断后的处理
                if(!queue.isEmpty()){

                    Channel c = queue.take();

                    if(c instanceof ServerSocketChannel){

                        ServerSocketChannel ser = (ServerSocketChannel) c;

                        ser.register(selector,SelectionKey.OP_ACCEPT);

                    }else if(c instanceof SocketChannel){

                        SocketChannel cli = (SocketChannel) c;

                        ByteBuffer buf = ByteBuffer.allocateDirect(4096);

                        cli.register(selector,SelectionKey.OP_READ,buf);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    private void readHandler(SelectionKey key) {

        ByteBuffer buffer = (ByteBuffer) key.attachment();
        SocketChannel client = (SocketChannel)key.channel();
        buffer.clear();
        while (true){
            try {
                int num = client.read(buffer);
                if(num > 0){
                    buffer.flip();
                    while (buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                }else if(num == 0){
                    break;
                }else if(num < 0){
                    //客户端断开了
                    System.out.println("client : " + client.getRemoteAddress() + " closed..");
                    key.channel();
                    break;
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

        }

    }

    private void acceptHandle(SelectionKey key) {

        /*
        * key是文件描述符,标识socket连接
        * 因为是服务端,所以使用channel获取ServerSocketChannel类似于ServerSocket
        * accept获取当前文件描述符所表示的客户端
        * */
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);

            //该线程只是接受,读写操作在其他线程,所以需要把客户端注册到其他线程的selector
            _group.nextSelectorV3(client);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void setWorket(SelectorThreadGroup stgWorket) {

        this._group = stgWorket;
    }
}

 

public class SelectorThreadGroup {

    SelectThread[] selects;

    ServerSocketChannel server = null;

    AtomicInteger xid = new AtomicInteger(0);

    //代表工作组,因为工作中中也包含selects,所以工作组中的线程需要使用stg.selects
    SelectorThreadGroup stg = this;

    public void setWorker(SelectorThreadGroup stg){
        this.stg = stg;
    }

    //num线程数
    public SelectorThreadGroup(int num){

        //线程数组,每个线程中有一个多路复用器
        selects = new SelectThread[num];

        for(int i = 0;i < num;i++){

            //每个线程属于哪个组
            selects[i] = new SelectThread(this);

            new Thread(selects[i]).start();
        }
    }

    //服务端绑定端口
    public void bind(int port) {

        try {

            //nio服务端绑定端口
            server = ServerSocketChannel.open();

            server.configureBlocking(false);

            server.bind(new InetSocketAddress(port));

            //使用多路复用器,需要把server注册到多路复用器中,接收客户端请求交给多路复用器,如accetp
            nextSelectorV3(server);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel c){

        SelectThread st = next();

        st.queue.add(c);

        st.selector.wakeup();
    }

    public void nextSelectorV3(Channel c){

        //如果是boos组,那么只需要去除boos中的线程,worket组的线程,需要使用工作组,然后在获取
        if(c instanceof ServerSocketChannel){

            SelectThread st = next();

            st.queue.add(c);

            st.setWorket(stg);

            st.selector.wakeup();

        }else if(c instanceof SocketChannel){

            SelectThread st = nextV3();

            //1,通过队列传递数据 消息
            st.queue.add(c);

            //2,通过打断阻塞,让对应的线程去自己在打断后完成注册selector
            st.selector.wakeup();

        }

        SelectThread st = nextV3();

        st.queue.add(c);

        st.selector.wakeup();
    }

    private SelectThread next(){

        int index = xid.incrementAndGet() % selects.length;

        return selects[index];

    }

    private SelectThread nextV3(){

        int index = xid.incrementAndGet() % stg.selects.length;

        return stg.selects[index];

    }
}

以上是 多路复用器支持高并发demo 的全部内容, 来源链接: utcz.com/z/518894.html

回到顶部