netty 4.x
rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup
ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                             // 就是这个 addLast 方法
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()
                            );
                    }
                });
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 拿到当前 ChannelHandler 指定的线程池
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            // 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?
第二行复制错了是 rocketmq NettyRemotingServer#start 最后一段 重新执行上面的 1.2.3.4 写错了,是 重新执行上面的 2.3.4 第一次写有点紧张,sorry
打扰大家了,已经看到为什么了
DefaultChannelPipeline#childExecutor
private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }
        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            return group.next();
        }
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            // Use size of 4 as most people only use one extra EventExecutor.
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }
        // Pin one of the child executors once and remember it so that the same child executor
        // is used to fire events for the same channel.
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }
应该一个 channel 会指定一个 SingleThreadEventExecutor
|  |      1mazai      2019-12-02 17:21:17 +08:00 netty 是不存在多线程的问题的,因为一个 Channel 注册的时候只会只会注册到一个 EventLoop 上( SingleThreadEventExecutor 本质上就是 EventLoop ),在注册的时候会判断当前执行注册的线程是不是 EventLoop 所在的线程,如果是就注册,否则就使用 EventLoop 所在的线程发起一个任务执行注册。一个 EventLoop 上会注册多个 Channel,所以在 EventLoop 维护的 Channel 生命周期中只会使用一个线程执行,因此也就不存在多线程的问题! 注册的部分可以看 AbstractUnsafe.register(); |