V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
Joker123456789
V2EX  ›  Java

关于 NIO 非阻塞的问题

  •  
  •   Joker123456789 · Jan 27, 2021 · 3064 views
    This topic created in 1917 days ago, the information mentioned may be changed or developed.

    下面这段代码,在内部循环的 “else if (selectionKey.isReadable()) {” 这个 if 里面, 如果用阻塞的方式进行数据读取,处理业务逻辑,响应等一系列操作,会造成整个循环阻塞,造成 NIO 的非阻塞特性丢失。

    请问有什么办法 可以解决这个问题?

    private void doMonitor(Selector selector) throws Exception {
            while (true) {
                int eventCountTriggered = selector.select();
                if (eventCountTriggered == 0) {
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = it.next();
                    it.remove();
    
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        try {
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } catch (IOException e) {
                            log.error("注册 SocketChannel 异常", e);
                        }
                    } else if (selectionKey.isReadable()) {
                        // 读取数据并处理业务逻辑,然后响应数据
                    }
                }
            }
    }
    
    17 replies    2021-01-27 17:24:18 +08:00
    zhuawadao
        1
    zhuawadao  
       Jan 27, 2021
    多线程啊,消息队列啊
    Joker123456789
        2
    Joker123456789  
    OP
       Jan 27, 2021
    @zhuawadao 多线程 报错,早试过了。
    cuantianhou
        3
    cuantianhou  
       Jan 27, 2021
    else if 里面的数据已经是准备好的了,不会阻塞了,但是你处理的逻辑如果太耗时,就单独起个线程处理
    Joker123456789
        4
    Joker123456789  
    OP
       Jan 27, 2021
    @cuantianhou 我刚才试验过,发起一个请求,在 else if 里的东西执行完之前,接着发起第二个请求,就被阻塞了。 不知道什么原因
    oxromantic
        5
    oxromantic  
       Jan 27, 2021
    @Joker123456789 “在 else if 里的东西执行完之前” NIO 是单线程模型,吃不消你在他的线程执行业务逻辑的,你只能在他的线程做数据读取操作,并立刻交还权限,把任务丢给其他线程处理
    sagaxu
        6
    sagaxu  
       Jan 27, 2021 via Android
    你不能阻塞 else if,也不能在里面消耗过多 CPU,一般不宜超过 10ms
    yamasa
        7
    yamasa  
       Jan 27, 2021
    NIO 的 IO 线程只应该负责 poll 各个端口,poll 到 IO 事件要进行业务处理的话肯定得分发切换到业务线程池,你这个写法就不对。Netty 里也有类似你这段代码 while true poll IO event 的核心逻辑,建议看看。
    liian2019
        8
    liian2019  
       Jan 27, 2021
    业务处理用多线程处理,另外,业务处理完成手动调用 selector.wakeup(),会立刻唤醒 selector 。可以看看单 Reactor 多线程模型,多 Reactor 多线程模型( netty 就是这么搞的)。
    v2orz
        9
    v2orz  
       Jan 27, 2021
    如 3L 所说,正常情况下读取数据并处理应该很快
    如果你的业务逻辑太重导致执行慢,应该考虑使用 Future f = executorService.submit(runnable)的方式(或者其他多线程方式)
    v2orz
        10
    v2orz  
       Jan 27, 2021
    这里本身就应该有个业务线程池,不断把读到的数据提交给业务线程
    zhuawadao
        11
    zhuawadao  
       Jan 27, 2021
    @Joker123456789 思路没错,多线程报错是多线程的问题。
    badbye
        12
    badbye  
       Jan 27, 2021
    使用 IO 多复用时不应该用来跑会阻塞的任务,你这种情况只能将阻塞的任务改成异步的
    liian2019
        13
    liian2019  
       Jan 27, 2021
    接 8L,可以参考下最简单的示例代码
    if (next.isAcceptable()){
    ...
    }
    else if (next.isReadable()){
    SocketChannel readChannel = (SocketChannel) next.channel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    ...
    Response message = new Response(messageByte);
    next.attach(message);
    threadPool.execute(() -> {
    log.info("接收到消息 {} ... 处理中...",message);
    ...
    message.setStatus(true);
    message.setData(...)
    next.interestOps(SelectionKey.OP_WRITE);
    selector.wakeup();

    });
    }
    else {
    String responseMessage = next.attachment().toString();
    SocketChannel writeChannel = (SocketChannel) next.channel();
    writeChannel.write(ByteBuffer.wrap(responseMessage.getBytes()));
    next.interestOps(SelectionKey.OP_READ);
    }
    mazai
        14
    mazai  
       Jan 27, 2021
    NIO 是同步非阻塞,非阻塞这个特性只是相对于 OS 层面是非阻塞的,代码层面还是同步的,第一次发起请求,来了一个 OP_READ 事件,单线程开始处理 isReadable 里的代码逻辑,这时候你再发一次请求必然会等待这个线程处理完代码逻辑。

    确实可以参考 Reactor 线程模型,来一个 master 线程,负责接受请求,接受完请求把这些请求分发给多个 worker 来处理业务逻辑,这样就不会阻塞你的请求了。
    wucao219101
        15
    wucao219101  
       Jan 27, 2021
    你这个代码相当于单线程来处理所有的 IO 事件,如果处理事件的业务逻辑本身有阻塞耗时的逻辑,那么肯定要另外再维护一个线程池。

    Node.js 能单线程处理的原因是它所有的业务逻辑都是异步的,没有阻塞的任务存在。Java 不一样,你查询 DB 、调用外部接口、读写文件等,都会阻塞当前线程,所以不去弄个线程池性能肯定上不去。就是 Netty 他也是最好用外部线程池 EventExecutorGroup 来处理业务逻辑的。
    Joker123456789
        16
    Joker123456789  
    OP
       Jan 27, 2021
    @wucao219101
    @mazai
    @v2orz

    用线程池做了以后,现在的问题是这样的

    ```java
    while (it.hasNext()) {
    SelectionKey selectionKey = it.next();
    it.remove();

    if (selectionKey.isAcceptable()) {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
    try {
    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.register(selector, SelectionKey.OP_READ);
    } catch (IOException e) {
    log.error("注册 SocketChannel 异常", e);
    }
    } else if (selectionKey.isReadable()) {
    log.info("进来了=========================================");
    Request request = new Request();
    request.selectionKey = selectionKey;
    request.selector = selector;

    Executors.newCachedThreadPool().execute(request);

    // selector.wakeup();
    }
    }
    ```

    直发起了一个请求,但是 这句 log 打印了好多次。 控制台还报异常。
    mightofcode
        17
    mightofcode  
       Jan 27, 2021
    你有两个选择:
    1,doMonitor 这个线程里面所有操作都是异步的,不存在任何阻塞
    2,另开线程进行处理阻塞任务
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   3747 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 48ms · UTC 10:42 · PVG 18:42 · LAX 03:42 · JFK 06:42
    ♥ Do have faith in what you're doing.