首先这是一个使用 Netty 框架,并用 UDP 来通信的 Demo。
主要类如下,代码有点多,可以主要看类 3 与类 5
1、用于统计并输出客户端数量的 GetDataHandlerAdapter
public class GetDataHandlerAdapter extends ChannelInboundHandlerAdapter {
private static Logger logger = LogManager.getLogger(GetDataHandlerAdapter.class);
private static volatile AtomicInteger count = new AtomicInteger(1);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info(count.getAndIncrement());
}
}
2、服务端 RPCAgentServer
public class RPCAgentServer {
private GetDataHandlerAdapter getDataHandlerAdapter = new GetDataHandlerAdapter();
public void listen(int port) throws InterruptedException {
Bootstrap serverBootstrap = new Bootstrap();
serverBootstrap.handler(new ChannelInitializer<DatagramChannel>() {
@Override
public void initChannel(DatagramChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(getDataHandlerAdapter);
}
});
serverBootstrap.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.bind(port)
.await();
}
}
3、客户端 RPCAgentClient
public class RPCAgentClient {
private String remoteHost;
private int remotePort;
private int localPort;
private Channel channel;
public static RPCAgentClient getNewClient(String remoteHost, int remotePort) {
RPCAgentClient rpcAgentClient = new RPCAgentClient();
rpcAgentClient.remoteHost = remoteHost;
rpcAgentClient.remotePort = remotePort;
rpcAgentClient.localPort = new Random().nextInt(15000) + 10000;
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup(1))
.channel(NioDatagramChannel.class)
.handler(new ChannelInboundHandlerAdapter());
bootstrap.localAddress(rpcAgentClient.localPort);
rpcAgentClient.channel = bootstrap.bind().channel();
} catch (Exception e) {
e.printStackTrace();
}
return rpcAgentClient;
}
public void close() {
if (channel != null) {
channel.close();
}
}
public void sendMsg(String msg) {
try {
channel.writeAndFlush(buildUdpMsg(remoteHost, remotePort, msg));
} catch (Exception e) {
e.printStackTrace();
}
}
private static DatagramPacket buildUdpMsg(String remoteHost, int remotePort, String msg) {
return new DatagramPacket(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8),
new InetSocketAddress(remoteHost, remotePort));
}
}
4、服务端测试类 NettyServerTest
public class NettyServerTest {
public static final int port = 8289;
public static void main(String[] args) {
try {
new RPCAgentServer().listen(port);
System.out.println(" service start finish ");
System.out.println(new Scanner(System.in).next());
} catch (Exception e) {
e.printStackTrace();
}
}
}
5、 客户端测试类 NettyClientTest,启动 30 个客户端并发送消息 public class NettyClientTest {
private static final String remoteHost = "127.0.0.1";
public static void main(String[] args) {
System.out.println("start start devices");
ExecutorService executorService = Executors.newFixedThreadPool(30);
for (int i = 0; i < 30; i++) {
executorService.execute(() -> {
RPCAgentClient rpcAgentClient = RPCAgentClient.getNewClient(remoteHost, NettyServerTest.port);
rpcAgentClient.sendMsg("1");
rpcAgentClient.close();
});
}
try {
executorService.shutdown();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("start devices finish");
}
}
启动顺序是先启动 NettyServerTest,再启动 NettyClientTest,观察 NettyServerTest 输出的计数。
以现在的代码 NettyServerTest 输出的是正确的数量 30
但是如果我在 RPCAgentClient 的 getNewClient 方法上加上 synchronized 关键字,不要问我为什么要加。。getNewClient 方法就变成
public synchronized static RPCAgentClient getNewClient(String remoteHost, int remotePort)
那么 NettyServerTest 输出的计数将小于 30,而且在我的电脑上的结果是小于 20。
那么问题来了,这是为什么?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.