netty总结服务端启动流程

编程

给自己做个总结(连接服务端初始化以及处理):

1. NioEventLoop 用来正真处理io连接的

2.NioEventLoopGroup 可以简单的理解为处理组一共两个,一个是接受连接的,一个是处理连接的,里面的chooser即是NioEventLoop数组

服务端初始化流程

入口
ChannelFuture f = b.bind(8888).sync();

   

public ChannelFuture bind(int inetPort) {

        return this.bind(new InetSocketAddress(inetPort));

    }

    

    public ChannelFuture bind(SocketAddress localAddress) {

        this.validate();

        if (localAddress == null) {

            throw new NullPointerException("localAddress");

        } else {

            return this.doBind(localAddress);

        }

    }

    private ChannelFuture doBind(final SocketAddress localAddress) {

        //初始化即注册

        final ChannelFuture regFuture = this.initAndRegister();

        final Channel channel = regFuture.channel();

        if (regFuture.cause() != null) {

            return regFuture;

        } else if (regFuture.isDone()) {

            ChannelPromise promise = channel.newPromise();

            //绑定

            doBind0(regFuture, channel, localAddress, promise);

            return promise;

        } else {

            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);

            regFuture.addListener(new ChannelFutureListener() {

                public void operationComplete(ChannelFuture future) throws Exception {

                    Throwable cause = future.cause();

                    if (cause != null) {

                        promise.setFailure(cause);

                    } else {

                        promise.registered();

                        AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);

                    }

                }

            });

            return promise;

        }

    }

    先说注册以及初始化

    final ChannelFuture initAndRegister() {

        Channel channel = null;

        try {

            //这里的channelFactory是启动代码的 b.channel(NioServerSocketChannel.class)传的通过反射实例化的对象

            channel = this.channelFactory.newChannel();

            //初始化

            this.init(channel);

        } catch (Throwable var3) {

            if (channel != null) {

                channel.unsafe().closeForcibly();

                return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);

            }

            return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);

        }

        //注册selector

        ChannelFuture regFuture = this.config().group().register(channel);

        if (regFuture.cause() != null) {

            if (channel.isRegistered()) {

                channel.close();

            } else {

                channel.unsafe().closeForcibly();

            }

        }

        return regFuture;

    }    

    

    //设置ChannelOptions ChannelAttrs     配置服务端Channel的相关属性

    //设置ChildOptions ChildAttrs    配置每个新连接的Channel的相关属性

    //Config handler    配置服务端pipeline

    //add ServerBootstrapAcceptor    添加连接器,对accpet接受到的新连接进行处理,添加一个nio线程

    void init(Channel channel) throws Exception {

        Map<ChannelOption<?>, Object> options = this.options0();

        synchronized(options) {

            setChannelOptions(channel, options, logger);

        }

        Map<AttributeKey<?>, Object> attrs = this.attrs0();

        synchronized(attrs) {

            Iterator var5 = attrs.entrySet().iterator();

            while(true) {

                if (!var5.hasNext()) {

                    break;

                }

                Entry<AttributeKey<?>, Object> e = (Entry)var5.next();

                AttributeKey<Object> key = (AttributeKey)e.getKey();

                channel.attr(key).set(e.getValue());

            }

        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = this.childGroup;

        final ChannelHandler currentChildHandler = this.childHandler;

        Map var9 = this.childOptions;

        final Entry[] currentChildOptions;

        synchronized(this.childOptions) {

            currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));

        }

        var9 = this.childAttrs;

        final Entry[] currentChildAttrs;

        synchronized(this.childAttrs) {

            currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));

        }

        p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {

            public void initChannel(final Channel ch) throws Exception {

                final ChannelPipeline pipeline = ch.pipeline();

                ChannelHandler handler = ServerBootstrap.this.config.handler();

                if (handler != null) {

                    pipeline.addLast(new ChannelHandler[]{handler});

                }

                ch.eventLoop().execute(new Runnable() {

                    public void run() {

                        //这里是在有了新连接接入的时候生成处理NioEventLoop的后续会讲到

                        pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});

                    }

                });

            }

        }});

    }

    接下来注册selector
   

public ChannelFuture register(Channel channel) {

        return this.next().register(channel);

    }

    public EventLoop next() {

        return (EventLoop)super.next();

    }

    public EventExecutor next() {

        return this.chooser.next();

    }


    从上面的代码我们可以看到其实调用的    NioEventLoop(父类) 的register方法   SingleThreadEventLoop
   

 public ChannelFuture register(Channel channel) {

        return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));

    }

    public ChannelFuture register(ChannelPromise promise) {

        ObjectUtil.checkNotNull(promise, "promise");

        promise.channel().unsafe().register(this, promise);

        return promise;

    }

    最终调用的是io.netty.channel.AbstractChannel.AbstractUnsafe
   

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

        if (eventLoop == null) {

            throw new NullPointerException("eventLoop");

        } else if (AbstractChannel.this.isRegistered()) {

            promise.setFailure(new IllegalStateException("registered to an event loop already"));

        } else if (!AbstractChannel.this.isCompatible(eventLoop)) {

            promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

        } else {

            //赋值 将NioEventLoop与channel绑定

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {

                //注册流程

                this.register0(promise);

            } else {

                try {

                    eventLoop.execute(new Runnable() {

                        public void run() {

                            AbstractUnsafe.this.register0(promise);

                        }

                    });

                } catch (Throwable var4) {

                    AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);

                    this.closeForcibly();

                    AbstractChannel.this.closeFuture.setClosed();

                    this.safeSetFailure(promise, var4);

                }

            }

        }

    }    

    private void register0(ChannelPromise promise) {

        try {

            if (!promise.setUncancellable() || !this.ensureOpen(promise)) {

                return;

            }

            boolean firstRegistration = this.neverRegistered;

            //这里

            AbstractChannel.this.doRegister();

            this.neverRegistered = false;

            AbstractChannel.this.registered = true;

            //执行这个方法的原因是 当pipeline调用的时候,如果该 pipeline 还没有注册到这个 eventLoop 上,则将这个包装过 handler 的 context 放进变量 pendingHandlerCallbackHead 中,然后在此执行

            AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();

            this.safeSetSuccess(promise);

            AbstractChannel.this.pipeline.fireChannelRegistered();

            if (AbstractChannel.this.isActive()) {

                if (firstRegistration) {

                    AbstractChannel.this.pipeline.fireChannelActive();

                } else if (AbstractChannel.this.config().isAutoRead()) {

                    this.beginRead();

                }

            }

        } catch (Throwable var3) {

            this.closeForcibly();

            AbstractChannel.this.closeFuture.setClosed();

            this.safeSetFailure(promise, var3);

        }

    }

    

    

    protected void doRegister() throws Exception {

        boolean selected = false;

        for (;;) {

            try {

                //把channel注册到eventLoop上的selector上 jdk

                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

                return;

            } catch (CancelledKeyException e) {

                if (!selected) {

                    // Force the Selector to select now as the "canceled" SelectionKey may still be

                    // cached and not removed because no Select.select(..) operation was called yet.

                    eventLoop().selectNow();

                    selected = true;

                } else {

                    // We forced a select operation on the selector before but the SelectionKey is still cached

                    // for whatever reason. JDK bug ?

                    throw e;

                }

            }

        }

    }


    
    
    开始绑定
    这时候服务端处理的NioEventLoop就已经在运行了  channel.eventLoop().execute()就已经触发了run方法
   

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {

            public void run() {

                if (regFuture.isSuccess()) {

                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

                } else {

                    promise.setFailure(regFuture.cause());

                }

            }

        });

    }

    public void execute(Runnable task) {

        if (task == null) {

            throw new NullPointerException("task");

        } else {

            boolean inEventLoop = this.inEventLoop();

            if (inEventLoop) {

                this.addTask(task);

            } else {

                //新起一个线程

                this.startThread();

                this.addTask(task);

                if (this.isShutdown() && this.removeTask(task)) {

                    reject();

                }

            }

            if (!this.addTaskWakesUp && this.wakesUpForTask(task)) {

                this.wakeup(inEventLoop);

            }

        }

    }

    private void startThread() {

        if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {

            try {

                this.doStartThread();

            } catch (Throwable var2) {

                STATE_UPDATER.set(this, 1);

                PlatformDependent.throwException(var2);

            }

        }

    }    

    

    private void doStartThread() {

        assert this.thread == null;

        this.executor.execute(new Runnable() {

            public void run() {

                SingleThreadEventExecutor.this.thread = Thread.currentThread();

                if (SingleThreadEventExecutor.this.interrupted) {

                    SingleThreadEventExecutor.this.thread.interrupt();

                }

                boolean success = false;

                SingleThreadEventExecutor.this.updateLastExecutionTime();

                boolean var112 = false;

                int oldState;

                label1685: {

                    try {

                        var112 = true;

                        //这步其实调用的就是NioEventLoop的run方法

                        SingleThreadEventExecutor.this.run();

                        success = true;

                        var112 = false;

                        break label1685;

                    } catch (Throwable var119) {

                        SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119);

                        var112 = false;

                    } finally {

                        if (var112) {

                            int oldStatex;

                            do {

                                oldStatex = SingleThreadEventExecutor.this.state;

                            } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3));

                            if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {

                                SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");

                            }

                            try {

                                while(!SingleThreadEventExecutor.this.confirmShutdown()) {

                                    ;

                                }

                            } finally {

                                try {

                                    SingleThreadEventExecutor.this.cleanup();

                                } finally {

                                    SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);

                                    SingleThreadEventExecutor.this.threadLock.release();

                                    if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {

                                        SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ")");

                                    }

                                    SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);

                                }

                            }

                        }

                    }

                    do {

                        oldState = SingleThreadEventExecutor.this.state;

                    } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));

                    if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {

                        SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");

                    }

                    try {

                        while(!SingleThreadEventExecutor.this.confirmShutdown()) {

                            ;

                        }

                        return;

                    } finally {

                        try {

                            SingleThreadEventExecutor.this.cleanup();

                        } finally {

                            SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);

                            SingleThreadEventExecutor.this.threadLock.release();

                            if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {

                                SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ")");

                            }

                            SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);

                        }

                    }

                }

                do {

                    oldState = SingleThreadEventExecutor.this.state;

                } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3));

                if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) {

                    SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates.");

                }

                try {

                    while(!SingleThreadEventExecutor.this.confirmShutdown()) {

                        ;

                    }

                } finally {

                    try {

                        SingleThreadEventExecutor.this.cleanup();

                    } finally {

                        SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5);

                        SingleThreadEventExecutor.this.threadLock.release();

                        if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) {

                            SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ")");

                        }

                        SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null);

                    }

                }

            }

        });

    }

    继续看绑定

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

        return this.pipeline.bind(localAddress, promise);

    }

   
   接下来是内部的pipeline调用链
   

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

        return this.tail.bind(localAddress, promise);

    }

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {

        if (localAddress == null) {

            throw new NullPointerException("localAddress");

        } else if (this.isNotValidPromise(promise, false)) {

            return promise;

        } else {

            final AbstractChannelHandlerContext next = this.findContextOutbound();

            EventExecutor executor = next.executor();

            if (executor.inEventLoop()) {

                next.invokeBind(localAddress, promise);

            } else {

                safeExecute(executor, new Runnable() {

                    public void run() {

                        next.invokeBind(localAddress, promise);

                    }

                }, promise, (Object)null);

            }

            return promise;

        }

    }

    

    //最终调用的是heade的unsafe.bind

    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {

        this.unsafe.bind(localAddress, promise);

    }

    public final void bind(SocketAddress localAddress, ChannelPromise promise) {

        this.assertEventLoop();

        if (promise.setUncancellable() && this.ensureOpen(promise)) {

            if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {

                AbstractChannel.logger.warn("A non-root user can"t receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");

            }

            boolean wasActive = AbstractChannel.this.isActive();

            try {

                AbstractChannel.this.doBind(localAddress);

            } catch (Throwable var5) {

                this.safeSetFailure(promise, var5);

                this.closeIfClosed();

                return;

            }

            if (!wasActive && AbstractChannel.this.isActive()) {

                this.invokeLater(new Runnable() {

                    public void run() {

                        AbstractChannel.this.pipeline.fireChannelActive();

                    }

                });

            }

            this.safeSetSuccess(promise);

        }

    }

    

    //绑定

    protected void doBind(SocketAddress localAddress) throws Exception {

        if (PlatformDependent.javaVersion() >= 7) {

            this.javaChannel().bind(localAddress, this.config.getBacklog());

        } else {

            this.javaChannel().socket().bind(localAddress, this.config.getBacklog());

        }

    }    

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

        return this.pipeline.bind(localAddress, promise);

    }

    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {

        return this.tail.bind(localAddress, promise);

    }

    //HeadContext

    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {

        this.unsafe.bind(localAddress, promise);

    }

    public final void bind(SocketAddress localAddress, ChannelPromise promise) {

        this.assertEventLoop();

        if (promise.setUncancellable() && this.ensureOpen(promise)) {

            if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {

                AbstractChannel.logger.warn("A non-root user can"t receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address (" + localAddress + ") anyway as requested.");

            }

            boolean wasActive = AbstractChannel.this.isActive();

            try {

                //jdk 绑定

                AbstractChannel.this.doBind(localAddress);

            } catch (Throwable var5) {

                this.safeSetFailure(promise, var5);

                this.closeIfClosed();

                return;

            }

            if (!wasActive && AbstractChannel.this.isActive()) {

                //注意 此时才是active 状态 执行下面的流程

                this.invokeLater(new Runnable() {

                    public void run() {

                        AbstractChannel.this.pipeline.fireChannelActive();

                    }

                });

            }

            this.safeSetSuccess(promise);

        }

    }

    //调用jdk绑定端口    

    protected void doBind(SocketAddress localAddress) throws Exception {

        if (PlatformDependent.javaVersion() >= 7) {

            this.javaChannel().bind(localAddress, this.config.getBacklog());

        } else {

            this.javaChannel().socket().bind(localAddress, this.config.getBacklog());

        }

    }


    后续处理
   

public final ChannelPipeline fireChannelActive() {

        AbstractChannelHandlerContext.invokeChannelActive(this.head);

        return this;

    }

    //在head中 还会处理readIfIsAutoRead

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        ctx.fireChannelActive();

        this.readIfIsAutoRead();

    }

        private void readIfIsAutoRead() {

            if (DefaultChannelPipeline.this.channel.config().isAutoRead()) {

                DefaultChannelPipeline.this.channel.read();

            }

        }

    public Channel read() {

        this.pipeline.read();

        return this;

    }    

    

    public final ChannelPipeline read() {

        this.tail.read();

        return this;

    }

    //最终还是到了head 中的unsafe方法  服务监控链接的NioEventLoop 的unsafe 是NioMessageUnsafe  

    public void read(ChannelHandlerContext ctx) {

        this.unsafe.beginRead();

    }    

    public final void beginRead() {

        this.assertEventLoop();

        if (AbstractChannel.this.isActive()) {

            try {

                AbstractChannel.this.doBeginRead();

            } catch (final Exception var2) {

                this.invokeLater(new Runnable() {

                    public void run() {

                        AbstractChannel.this.pipeline.fireExceptionCaught(var2);

                    }

                });

                this.close(this.voidPromise());

            }

        }

    }


            注册SelectionKey.OP_ACCEPT事件 至此 服务端启动完毕  NioEventLoop 也在运行中for(;;)
   

protected void doBeginRead() throws Exception {

        SelectionKey selectionKey = this.selectionKey;

        if (selectionKey.isValid()) {

            this.readPending = true;

            int interestOps = selectionKey.interestOps();

            if ((interestOps & this.readInterestOp) == 0) {

                selectionKey.interestOps(interestOps | this.readInterestOp);

            }

        }

    }


总结:初始化NioEventLoop 这是无限监听线程的封装  初始化channel  绑定NioEventLoop  绑定端口 注册 SelectionKey.OP_ACCEPT

新连接接入
入口
io.netty.channel.nio.NioEventLoop#run 无限循环监听 当有链接进入的时候调用链如下
   

protected void run() {

        while(true) {

            while(true) {

                try {

                    switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {

                    case -2:

                        continue;

                    case -1:

                        //select 操作

                        this.select(this.wakenUp.getAndSet(false));

                        if (this.wakenUp.get()) {

                            this.selector.wakeup();

                        }

                    default:

                        this.cancelledKeys = 0;

                        this.needsToSelectAgain = false;

                        int ioRatio = this.ioRatio;

                        if (ioRatio == 100) {

                            try {

                                //有链接的时候调用这个

                                this.processSelectedKeys();

                            } finally {

                                this.runAllTasks();

                            }

                        } else {

                            long ioStartTime = System.nanoTime();

                            boolean var13 = false;

                            try {

                                var13 = true;

                                this.processSelectedKeys();

                                var13 = false;

                            } finally {

                                if (var13) {

                                    long ioTime = System.nanoTime() - ioStartTime;

                                    this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);

                                }

                            }

                            long ioTime = System.nanoTime() - ioStartTime;

                            this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);

                        }

                    }

                } catch (Throwable var21) {

                    handleLoopException(var21);

                }

                try {

                    if (this.isShuttingDown()) {

                        this.closeAll();

                        if (this.confirmShutdown()) {

                            return;

                        }

                    }

                } catch (Throwable var18) {

                    handleLoopException(var18);

                }

            }

        }

    }

    private void processSelectedKeys() {

        if (this.selectedKeys != null) {

            this.processSelectedKeysOptimized();

        } else {

            this.processSelectedKeysPlain(this.selector.selectedKeys());

        }

    }

    

    private void processSelectedKeysOptimized() {

        for(int i = 0; i < this.selectedKeys.size; ++i) {

            SelectionKey k = this.selectedKeys.keys[i];

            this.selectedKeys.keys[i] = null;

            Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {

                this.processSelectedKey(k, (AbstractNioChannel)a);

            } else {

                NioTask<SelectableChannel> task = (NioTask)a;

                processSelectedKey(k, task);

            }

            if (this.needsToSelectAgain) {

                this.selectedKeys.reset(i + 1);

                this.selectAgain();

                i = -1;

            }

        }

    }

    //这个方法

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

        NioUnsafe unsafe = ch.unsafe();

        if (!k.isValid()) {

            NioEventLoop eventLoop;

            try {

                eventLoop = ch.eventLoop();

            } catch (Throwable var6) {

                return;

            }

            if (eventLoop == this && eventLoop != null) {

                unsafe.close(unsafe.voidPromise());

            }

        } else {

            try {

                int readyOps = k.readyOps();

                if ((readyOps & 8) != 0) {

                    int ops = k.interestOps();

                    ops &= -9;

                    k.interestOps(ops);

                    unsafe.finishConnect();

                }

                if ((readyOps & 4) != 0) {

                    ch.unsafe().forceFlush();

                }

                if ((readyOps & 17) != 0 || readyOps == 0) {

                    //其实最终调用的是这个方法

                    unsafe.read();

                }

            } catch (CancelledKeyException var7) {

                unsafe.close(unsafe.voidPromise());

            }

        }

    }

        public void read() {

            assert AbstractNioMessageChannel.this.eventLoop().inEventLoop();

            ChannelConfig config = AbstractNioMessageChannel.this.config();

            ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();

            Handle allocHandle = AbstractNioMessageChannel.this.unsafe().recvBufAllocHandle();

            allocHandle.reset(config);

            boolean closed = false;

            Throwable exception = null;

            try {

                int localRead;

                try {

                    do {

                        localRead = AbstractNioMessageChannel.this.doReadMessages(this.readBuf);

                        if (localRead == 0) {

                            break;

                        }

                        if (localRead < 0) {

                            closed = true;

                            break;

                        }

                        allocHandle.incMessagesRead(localRead);

                    } while(allocHandle.continueReading());

                } catch (Throwable var11) {

                    exception = var11;

                }

                localRead = this.readBuf.size();

                for(int i = 0; i < localRead; ++i) {

                    AbstractNioMessageChannel.this.readPending = false;

                    //这块很关键  最终是一个 head->unsafe->ServerBootstrapAcceptor的调用链

                    pipeline.fireChannelRead(this.readBuf.get(i));

                }

                this.readBuf.clear();

                allocHandle.readComplete();

                pipeline.fireChannelReadComplete();

                if (exception != null) {

                    closed = AbstractNioMessageChannel.this.closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);

                }

                if (closed) {

                    AbstractNioMessageChannel.this.inputShutdown = true;

                    if (AbstractNioMessageChannel.this.isOpen()) {

                        this.close(this.voidPromise());

                    }

                }

            } finally {

                if (!AbstractNioMessageChannel.this.readPending && !config.isAutoRead()) {

                    this.removeReadOp();

                }

            }

        }


    接受链接 accept  add到buf list
   

protected int doReadMessages(List<Object> buf) throws Exception {

        SocketChannel ch = SocketUtils.accept(this.javaChannel());

        try {

            if (ch != null) {

                //this 为服务端channel,即NioServerSocketChannel    ch 为jdk创建的客户端channel ch 里面的 readInterestOp为SelectionKey.OP_READ

                buf.add(new NioSocketChannel(this, ch));

                return 1;

            }

        } catch (Throwable var6) {

            logger.warn("Failed to create a new channel from an accepted socket.", var6);

            try {

                ch.close();

            } catch (Throwable var5) {

                logger.warn("Failed to close a socket.", var5);

            }

        }

ServerBootstrapAcceptor 

        public void channelRead(ChannelHandlerContext ctx, Object msg) {

            final Channel child = (Channel) msg;

            //添加业务handler

            //流程如下  io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)   

            //io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)

            //io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)

            //io.netty.channel.DefaultChannelPipeline#callHandlerAdded0

            //io.netty.channel.AbstractChannelHandlerContext#callHandlerAdded

            // handler().handlerAdded(this);  业务加的自定义ChannelInitializer 里面的handlerAdded 方法其实就是调用initChannel方法 也就是我们加handle的常用方式

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            setAttributes(child, childAttrs);

            try {

                //register 过程与服务端监听的channel大致差不多  

                childGroup.register(child).addListener(new ChannelFutureListener() {

                    @Override

                    public void operationComplete(ChannelFuture future) throws Exception {

                        if (!future.isSuccess()) {

                            forceClose(child, future.cause());

                        }

                    }

                });

            } catch (Throwable t) {

                forceClose(child, t);

            }

        }

//workerGroup

public ChannelFuture register(Channel channel) {

    return this.next().register(channel);

}

public ChannelFuture register(Channel channel, ChannelPromise promise) {

    if(channel == null) {

        throw new NullPointerException("channel");

    } else if(promise == null) {

        throw new NullPointerException("promise");

    } else {

        //这里的unsafe是NioSocketChannel$NioSocketChannelUnsafe

        channel.unsafe().register(this, promise);

        return promise;

    }

}

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    if(eventLoop == null) {

        throw new NullPointerException("eventLoop");

    } else if(AbstractChannel.this.isRegistered()) {

        promise.setFailure(new IllegalStateException("registered to an event loop already"));

    } else if(!AbstractChannel.this.isCompatible(eventLoop)) {

        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

    } else {

        AbstractChannel.this.eventLoop = eventLoop;

        // 开始真正的异步  此时线程是boss线程 而不是worder线程 会异步  同时启动NioEventLoop的run方法

        if(eventLoop.inEventLoop()) {

            this.register0(promise);

        } else {

            try {

                eventLoop.execute(new OneTimeTask() {

                    public void run() {

                        AbstractUnsafe.this.register0(promise);

                    }

                });

            } catch (Throwable var4) {

                AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);

                this.closeForcibly();

                AbstractChannel.this.closeFuture.setClosed();

                this.safeSetFailure(promise, var4);

            }

        }

 

    }

}

//同上

private void register0(ChannelPromise promise) {

}

protected void doRegister() throws Exception {

    boolean selected = false;

 

    while(true) {

        try {

            this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);

            return;

        } catch (CancelledKeyException var3) {

            if(selected) {

                throw var3;

            }

 

            this.eventLoop().selectNow();

            selected = true;

        }

    }

}

public ChannelPipeline fireChannelActive() {

    this.head.fireChannelActive();

    if(this.channel.config().isAutoRead()) {

        this.channel.read();

    }

 

    return this;

}

public ChannelHandlerContext read() {

    final AbstractChannelHandlerContext next = this.findContextOutbound();

    EventExecutor executor = next.executor();

    if(executor.inEventLoop()) {

        next.invokeRead();

    } else {

        Runnable task = next.invokeReadTask;

        if(task == null) {

            next.invokeReadTask = task = new Runnable() {

                public void run() {

                    next.invokeRead();

                }

            };

        }

 

        executor.execute(task);

    }

 

    return this;

}

protected void doBeginRead() throws Exception {

    if(!this.inputShutdown) {

        SelectionKey selectionKey = this.selectionKey;

        if(selectionKey.isValid()) {

            this.readPending = true;

            int interestOps = selectionKey.interestOps();

            if((interestOps & this.readInterestOp) == 0) {

                selectionKey.interestOps(interestOps | this.readInterestOp);

            }

 

        }

    }

}

当客户端链接有读事件时,也会调用unsafe.read()方法 不过此时unsafe是NioByteUnsafe
       

 public final void read() {

            final ChannelConfig config = config();

            final ChannelPipeline pipeline = pipeline();

            final ByteBufAllocator allocator = config.getAllocator();

            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

            allocHandle.reset(config);

            ByteBuf byteBuf = null;

            boolean close = false;

            try {

                do {

                    byteBuf = allocHandle.allocate(allocator);

                    allocHandle.lastBytesRead(doReadBytes(byteBuf));

                    if (allocHandle.lastBytesRead() <= 0) {

                        // nothing was read. release the buffer.

                        byteBuf.release();

                        byteBuf = null;

                        close = allocHandle.lastBytesRead() < 0;

                        break;

                    }

                    allocHandle.incMessagesRead(1);

                    readPending = false;

                    //这里 从而实现了处理数据的能力

                    pipeline.fireChannelRead(byteBuf);

                    byteBuf = null;

                } while (allocHandle.continueReading());

                allocHandle.readComplete();

                pipeline.fireChannelReadComplete();

                if (close) {

                    closeOnRead(pipeline);

                }

            } catch (Throwable t) {

                handleReadException(pipeline, byteBuf, t, close, allocHandle);

            } finally {

              

                if (!readPending && !config.isAutoRead()) {

                    removeReadOp();

                }

            }

        }

    }


    

        
netty是在哪里检测到有新连接接入的?
boss线程执行的第一个过程,轮询出accpet事件,然后第二个过程通过jdk底层的accept方法,去创建这个连接
新连接时怎么注册到NioEventLoop线程的
简单来说,boss线程通过获取chooser的next()方法,拿到一个NioEventLoop。然后将新连接注册到这个NioEventLoop上的selector。

一个NioEventLoop对应一个selector

pipeline
在AbstractChannel初始化

入站事件
ChannelPipeline中的传播顺序为:head ——> ...... ——> tail。
出站事件
ChannelPipeline中各个ChannelHandler的执行顺序是:tail ——> ...... ——> head。

以上是 netty总结服务端启动流程 的全部内容, 来源链接: utcz.com/z/516509.html

回到顶部