NIO 如下代码怎么绕过死锁?还是说我写的不对?如下代码

2020-06-04 18:19:41 +08:00
 Aruforce
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NIO2 {
    private static volatile  boolean keepRunning = true;
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8090), 128);
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        Selector socketChannelSelector = Selector.open();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (keepRunning) {
                        int select = selector.select();
                        if (select > 0) {
                            Set<SelectionKey> selectedKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectedKeys.iterator();
                            while (iterator.hasNext()) {
                                SelectionKey next = iterator.next();
                                if (next.isValid()){
                                    if(next.isAcceptable()) {
                                        SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
                                        accept.configureBlocking(false);
                                        // thread block 怎么绕过 还是说我用的不对
                                        accept.register(socketChannelSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    }
                                }else {
                                    socketChannelSelector.keys().remove(next);
                                }
                                iterator.remove();
                            }
                        }
                    }
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (keepRunning) {
                        int select = socketChannelSelector.select();
                        if (select > 0) {
                            Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectedKeys.iterator();
                            while (iterator.hasNext()) {
                                SelectionKey next = iterator.next();
                                if (next.isReadable()) {

                                } else if (next.isWritable()) {
                                    ByteBuffer byteBuffe = ByteBuffer.allocate(4);
                                    int l = (int)(System.currentTimeMillis() / 1000L + 2208988800L);
                                    byteBuffe.put(fromInt(l));
                                    byteBuffe.flip();
                                    ((SocketChannel) next.channel()).write(byteBuffe);
                                    ((SocketChannel) next.channel()).close();
                                }
                                iterator.remove();
                            }
                        }
                    }
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }).start();

    }

    public static byte[] fromInt(int i) {
        byte[] result = new byte[4];
        result[3] = (byte) (i);
        result[2] = (byte) (i >> 8);
        result[1] = (byte) (i >> 16);
        result[0] = (byte) (i >> 24);
        return result;
    }

    public static int fromByteArray(byte[] bytes) {
        if (bytes.length != 4) {
            throw new IllegalArgumentException("bytes array length = " + bytes.length);
        }
        //
        int a = bytes[3] & 0x000000ff;
        a |= (bytes[2] << 8) & 0x0000ffff;
        a |= (bytes[1] << 16) & 0x00ffffff;
        a |= (bytes[0] << 24) & 0xFFffffff;
        return a;
    }
}
2973 次点击
所在节点    程序员
11 条回复
Aruforce
2020-06-04 18:50:20 +08:00
select 和 registe 都要获取 Selector publicKeys 的锁....
是不是不该这么用啊...
badteeth
2020-06-04 19:53:09 +08:00
在 selector 迭代时是不能注册新的 channel 的;你可以把 accept 的所有注册操作合并到一次 selector 的迭代循环之后;
Aruforce
2020-06-04 21:17:48 +08:00
@badteeth ……我是想一个 selector 负责 serversocketchannel 建连接…一个 selector 负责 socketchannel 读写状态…但是同一个 selector select 和 registe 是不能并行的 都要滚去 selector publishkey 的锁……会死锁……我绕不过去了……
Aruforce
2020-06-04 21:22:11 +08:00
@badteeth 正常情况应该是一个端口一个 selector…单线程负责当前端口的全部的建连接及 socketchannel 的 registe……还有个线程池负责真正的读写数据和处理业务逻辑… 我想问的是怎么绕过去…selector.wakeup 不好使…
arloor
2020-06-04 21:56:56 +08:00
问题确实是 select 和 register 都要获取同样的锁。

你现在是启动两个线程,一个 select-accept-register,另一个 select-read/write 。除了锁的问题哈,还有你会丢失 accept 和读写事件,线程 1 会忽略读写,线程 2 会忽略 accept 。

楼主知道 reactor 模式吗?一个线程负责接收事件( accept 、read 、write ),后面一个线程池负责处理这些事件。

应该长这样:一个线程 select-accept/read/write->由线程池来处理不同事件( accept 可以直接在原线程进行 register )

PS:实践到这里差不多了,可以看 netty 了。先用用 netty,然后看下 netty 源码,相信就能清楚。
MoHen9
2020-06-04 22:48:02 +08:00
Selector 用两个,分为主从,主 Selector 专门负责 ServerSocketChannel 的 accept 事件,因为建立连接只有一次,从 Selector 负责 SocketChannel 的读写事件,主 ServerSocketChannel 是没有读写事件的; Netty 里面是每个 SocketChannel 线程都有一个 Selector,ServerSocketChannel 的 Selector 只用了一个。
Aruforce
2020-06-05 13:07:35 +08:00
@arloor socketchannelSelector 不需要处理 accept 啊... serverSocketChannel 不需要处理 read write 。。。

你说的 reator 模式 ( 1 个线程循环处理 selector.selectedKeys 处理好 accept 及 registe 并且将 read/write dispatch 给线程池来处理)

我是想把 所有的 socketchannel 的 read/write 交给另一个 selector... 但是碰到了死锁。。。想写个 Wrapper 但是没写出来
Aruforce
2020-06-05 13:13:40 +08:00
@MoHen9 主 ServerSocketChannel accept 的 socketChannel 向 从 selector registe 的时候会碰到 锁竞争。。。 我不知道怎么绕过去 从 selector.wakeup() 并不好用 能注册上是运气好 CPU 先执行了主 Selector 的线程...还有像一部份没办法注册上的。。。
从 selector.select(1) 这样 其实也不对...如果主 selector accept 的 keyset 很大的话 就走不下去了
arloor
2020-06-05 14:39:44 +08:00
@Aruforce 看错了,没看到你用了两个 selector 。

你现在问题是线程 1 register 、线程 2 select,死锁了。解决很简单,用个生产者消费者模型,把要 register 的 channel 放到队列中,线程二在每次 select 前先 register 队列中的 channel 。这时候你可能又要问了,万一线程 2 一直阻塞在 select 时,怎么办? 答案:用 selectNow()或 select ( timeout )。


刚刚看了 netty 源码中怎么进行 register 的,他也是生产者消费者模型,由进行 select 的线程来 register 。代码如下:

AbstractChannel.java (Netty 4.1):

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise); // 这里不会走到
} else {
try {
// eventLoop 就是 netty 中线程,提交一个 register 任务,后面会被执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
Aruforce
2020-06-08 09:43:35 +08:00
@arloor
0. Wrapper 这条路 实在没走通;
1. 遵从你的建议...serverSocketChannelSelector 不要直接给了放缓存对队列...socketChannelSelector 自己来拿吧...
3. 但是 socketChannelSelector 有空转的问题而且如果 blockingDeque 缓存的比较多的话...感觉有问题

```
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingDeque;

public class NIO2 {
private static volatile boolean keepRunning = true;
private static LinkedBlockingDeque<SocketChannel> blockingDeque = new LinkedBlockingDeque<SocketChannel>();
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8090), 128);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Selector socketChannelSelector = Selector.open();
new Thread(new Runnable() {
@Override
public void run() {
try {
while (keepRunning) {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isValid()) {
if (next.isAcceptable()) {
SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
accept.configureBlocking(false);
blockingDeque.add(accept);
}
} else {
socketChannelSelector.keys().remove(next);
}
iterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
while (keepRunning) {
if (!blockingDeque.isEmpty()) {
blockingDeque.removeIf(value->{
try {
value.register(socketChannelSelector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
return false;
}
return true;
});
}
// 又是在空转。。。
int select = socketChannelSelector.select(1);
if (select > 0) {
Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isReadable()) {

} else if (next.isWritable()) {
ByteBuffer byteBuffe = ByteBuffer.allocate(4);
int l = (int) (System.currentTimeMillis() / 1000L + 2208988800L);
byteBuffe.put(fromInt(l));
byteBuffe.flip();
((SocketChannel) next.channel()).write(byteBuffe);
((SocketChannel) next.channel()).close();
}
iterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();

}

public static byte[] fromInt(int i) {
byte[] result = new byte[4];
result[3] = (byte) (i);
result[2] = (byte) (i >> 8);
result[1] = (byte) (i >> 16);
result[0] = (byte) (i >> 24);
return result;
}

public static int fromByteArray(byte[] bytes) {
if (bytes.length != 4) {
throw new IllegalArgumentException("bytes array length = " + bytes.length);
}
//
int a = bytes[3] & 0x000000ff;
a |= (bytes[2] << 8) & 0x0000ffff;
a |= (bytes[1] << 16) & 0x00ffffff;
a |= (bytes[0] << 24) & 0xFFffffff;
return a;
}
}
```
arloor
2020-06-08 11:38:00 +08:00
@Aruforce 这代码我运行了下,用 nc localhost 8090 测试了下,没有问题啊

你说的空转是什么意思?

不要总是感觉有问题,永远没有最优解,都是权衡和妥协。最怕你觉得这样有问题,然后就什么都不做。

优化是后面的事。

另外,说真的,就是这样了,不要觉得有啥问题。真可以增加的就是线程池了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/678682

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX