import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class NIO2 {
private static volatile boolean keepRunning = true;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8090), 128);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
Selector socketChannelSelector = Selector.open();
new Thread(new Runnable() {
@Override
public void run() {
try {
while (keepRunning) {
int select = selector.select();
if (select > 0) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isValid()){
if(next.isAcceptable()) {
SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
accept.configureBlocking(false);
// thread block 怎么绕过 还是说我用的不对
accept.register(socketChannelSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}else {
socketChannelSelector.keys().remove(next);
}
iterator.remove();
}
}
}
}catch (IOException e){
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
while (keepRunning) {
int select = socketChannelSelector.select();
if (select > 0) {
Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
if (next.isReadable()) {
} else if (next.isWritable()) {
ByteBuffer byteBuffe = ByteBuffer.allocate(4);
int l = (int)(System.currentTimeMillis() / 1000L + 2208988800L);
byteBuffe.put(fromInt(l));
byteBuffe.flip();
((SocketChannel) next.channel()).write(byteBuffe);
((SocketChannel) next.channel()).close();
}
iterator.remove();
}
}
}
}catch (IOException e){
e.printStackTrace();
}
}
}).start();
}
public static byte[] fromInt(int i) {
byte[] result = new byte[4];
result[3] = (byte) (i);
result[2] = (byte) (i >> 8);
result[1] = (byte) (i >> 16);
result[0] = (byte) (i >> 24);
return result;
}
public static int fromByteArray(byte[] bytes) {
if (bytes.length != 4) {
throw new IllegalArgumentException("bytes array length = " + bytes.length);
}
//
int a = bytes[3] & 0x000000ff;
a |= (bytes[2] << 8) & 0x0000ffff;
a |= (bytes[1] << 16) & 0x00ffffff;
a |= (bytes[0] << 24) & 0xFFffffff;
return a;
}
}
1
Aruforce OP select 和 registe 都要获取 Selector publicKeys 的锁....
是不是不该这么用啊... |
2
badteeth 2020-06-04 19:53:09 +08:00
在 selector 迭代时是不能注册新的 channel 的;你可以把 accept 的所有注册操作合并到一次 selector 的迭代循环之后;
|
3
Aruforce OP @badteeth ……我是想一个 selector 负责 serversocketchannel 建连接…一个 selector 负责 socketchannel 读写状态…但是同一个 selector select 和 registe 是不能并行的 都要滚去 selector publishkey 的锁……会死锁……我绕不过去了……
|
4
Aruforce OP @badteeth 正常情况应该是一个端口一个 selector…单线程负责当前端口的全部的建连接及 socketchannel 的 registe……还有个线程池负责真正的读写数据和处理业务逻辑… 我想问的是怎么绕过去…selector.wakeup 不好使…
|
5
arloor 2020-06-04 21:56:56 +08:00
问题确实是 select 和 register 都要获取同样的锁。
你现在是启动两个线程,一个 select-accept-register,另一个 select-read/write 。除了锁的问题哈,还有你会丢失 accept 和读写事件,线程 1 会忽略读写,线程 2 会忽略 accept 。 楼主知道 reactor 模式吗?一个线程负责接收事件( accept 、read 、write ),后面一个线程池负责处理这些事件。 应该长这样:一个线程 select-accept/read/write->由线程池来处理不同事件( accept 可以直接在原线程进行 register ) PS:实践到这里差不多了,可以看 netty 了。先用用 netty,然后看下 netty 源码,相信就能清楚。 |
6
MoHen9 2020-06-04 22:48:02 +08:00
Selector 用两个,分为主从,主 Selector 专门负责 ServerSocketChannel 的 accept 事件,因为建立连接只有一次,从 Selector 负责 SocketChannel 的读写事件,主 ServerSocketChannel 是没有读写事件的; Netty 里面是每个 SocketChannel 线程都有一个 Selector,ServerSocketChannel 的 Selector 只用了一个。
|
7
Aruforce OP @arloor socketchannelSelector 不需要处理 accept 啊... serverSocketChannel 不需要处理 read write 。。。
你说的 reator 模式 ( 1 个线程循环处理 selector.selectedKeys 处理好 accept 及 registe 并且将 read/write dispatch 给线程池来处理) 我是想把 所有的 socketchannel 的 read/write 交给另一个 selector... 但是碰到了死锁。。。想写个 Wrapper 但是没写出来 |
8
Aruforce OP @MoHen9 主 ServerSocketChannel accept 的 socketChannel 向 从 selector registe 的时候会碰到 锁竞争。。。 我不知道怎么绕过去 从 selector.wakeup() 并不好用 能注册上是运气好 CPU 先执行了主 Selector 的线程...还有像一部份没办法注册上的。。。
从 selector.select(1) 这样 其实也不对...如果主 selector accept 的 keyset 很大的话 就走不下去了 |
9
arloor 2020-06-05 14:39:44 +08:00
@Aruforce 看错了,没看到你用了两个 selector 。
你现在问题是线程 1 register 、线程 2 select,死锁了。解决很简单,用个生产者消费者模型,把要 register 的 channel 放到队列中,线程二在每次 select 前先 register 队列中的 channel 。这时候你可能又要问了,万一线程 2 一直阻塞在 select 时,怎么办? 答案:用 selectNow()或 select ( timeout )。 刚刚看了 netty 源码中怎么进行 register 的,他也是生产者消费者模型,由进行 select 的线程来 register 。代码如下: AbstractChannel.java (Netty 4.1): @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); // 这里不会走到 } else { try { // eventLoop 就是 netty 中线程,提交一个 register 任务,后面会被执行 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } |
10
Aruforce OP @arloor
0. Wrapper 这条路 实在没走通; 1. 遵从你的建议...serverSocketChannelSelector 不要直接给了放缓存对队列...socketChannelSelector 自己来拿吧... 3. 但是 socketChannelSelector 有空转的问题而且如果 blockingDeque 缓存的比较多的话...感觉有问题 ``` import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; public class NIO2 { private static volatile boolean keepRunning = true; private static LinkedBlockingDeque<SocketChannel> blockingDeque = new LinkedBlockingDeque<SocketChannel>(); public static void main(String[] args) throws Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(8090), 128); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); Selector socketChannelSelector = Selector.open(); new Thread(new Runnable() { @Override public void run() { try { while (keepRunning) { int select = selector.select(); if (select > 0) { Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); if (next.isValid()) { if (next.isAcceptable()) { SocketChannel accept = ((ServerSocketChannel) next.channel()).accept(); accept.configureBlocking(false); blockingDeque.add(accept); } } else { socketChannelSelector.keys().remove(next); } iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { while (keepRunning) { if (!blockingDeque.isEmpty()) { blockingDeque.removeIf(value->{ try { value.register(socketChannelSelector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { e.printStackTrace(); return false; } return true; }); } // 又是在空转。。。 int select = socketChannelSelector.select(1); if (select > 0) { Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); if (next.isReadable()) { } else if (next.isWritable()) { ByteBuffer byteBuffe = ByteBuffer.allocate(4); int l = (int) (System.currentTimeMillis() / 1000L + 2208988800L); byteBuffe.put(fromInt(l)); byteBuffe.flip(); ((SocketChannel) next.channel()).write(byteBuffe); ((SocketChannel) next.channel()).close(); } iterator.remove(); } } } } catch (IOException e) { e.printStackTrace(); } } }).start(); } public static byte[] fromInt(int i) { byte[] result = new byte[4]; result[3] = (byte) (i); result[2] = (byte) (i >> 8); result[1] = (byte) (i >> 16); result[0] = (byte) (i >> 24); return result; } public static int fromByteArray(byte[] bytes) { if (bytes.length != 4) { throw new IllegalArgumentException("bytes array length = " + bytes.length); } // int a = bytes[3] & 0x000000ff; a |= (bytes[2] << 8) & 0x0000ffff; a |= (bytes[1] << 16) & 0x00ffffff; a |= (bytes[0] << 24) & 0xFFffffff; return a; } } ``` |