求助 C#有办法在自己程序内部实现一个跨线程的简易消息队列吗?现在这种轮询数据库的实现 CPU 占用率很高, 4C8G 阿 里 云占用一直在 100%

2023-11-22 12:58:21 +08:00
 drymonfidelia

需求是几百个客户端不断给我们上报一些数据(加起来每分钟 2000 条左右)我们分类后上报给不同上游。上游的接口设计非常差劲(按照同一份文档),一次只能接受一条数据,有的上游一个请求 3 分钟才响应。上游的程序不是我们能控制的,我们也没权利要求他们修改。

目前我的设计是一个独立 ASP.Net Core 程序接受数据存入数据库(因为有在线率要求,处理任务的程序需要经常重启更新,有的时候会更新坏掉),另一个程序每 2 秒查询一次数据库的新数据,按需要上报的上游分类好进入 5 个不同队列(不能接受数据的时候就分类,因为分类的逻辑也要经常改),另外启动的时候开 5 个线程在数据库里扫描这些队列,发现新的任务就开一个异步 Task 上报。不同上游能接受的并发不一样,Task 外面有个 semaphoreSlim.WaitAsync();防止把上游服务弄炸。这种实现 CPU 占用率很高,4C8G 阿 里 云占用一直在 100%,有没有人知道最佳实现是什么?

3004 次点击
所在节点    .NET
27 条回复
thinkershare
2023-11-22 13:19:17 +08:00
跨线程? 线程不是本身就可以直接共享数据吗? 你需要的是一个多线程安全的进程内消息队列吧?
感觉是你的实现不对。开 5 个线程扫描干啥? 后台任务 1 个线程就够了,你又不是 CPU 限制的计算类任务。
encro
2023-11-22 14:09:05 +08:00
问题:

1 ,你 1 分钟接收 2000 条;
2 ,上游 3 分钟才能处理一条;
3 ,你得开几千个并发才能不阻塞。。。

即使你用了队列,也会造成队列阻塞,内存爆炸。。。。

4C8G ,开几千个并发,应该是不够的。。。


至于 cpu ,需要 cpu 干啥呢。。。

直接存队列就好了,最简单的队列就是 redis 。还有 ZeroMQ
RedBeanIce
2023-11-22 14:51:49 +08:00
1 ,你 1 分钟收到 2000 条数据
2 ,你需要进行分类,每个分类有不同的处理方式,转成不同的报文
3 ,上游一次只能接收一条数据
4 ,上游处理数据很慢

问题,
1 ,在线率是什么
2 ,处理任务的程序是指你写的这个程序么,为什么要一直重启

根据现有信息,整理的方案如下
1 ,下游客户端给你提供数据,你直接入库
2 ,你的程序直接去数据库获取,一次性获取一条
3 ,按照分类业务,处理成固定的报文,推送给上游,
4 ,如果上游上次未处理完成,你的程序不要做新的推送数据处理,继续等待。
ragnaroks
2023-11-22 15:04:29 +08:00
你的上报,开几个 singleton service 不就好了么,里面用自锁 timer 幂等。看你的描述和我以前做支付接口差不多,但是那会用 core 3.1 跑 200 多并发(每分钟 12000 请求),只用了几个 1t2g 的轻量云做高可用,外加一个 2t4g80g 的轻量数据库。
009694
2023-11-22 15:28:14 +08:00
如果你的上报数据平均速率大于你的上游处理速率的话 你就是想破天也没用 再大的消息队列都承受不住这个注定要爆炸的业务
pming1
2023-11-22 15:35:42 +08:00
为什么要接这样的需求呢?生产效率远大于消费,不管用什么消息队列还是数据库,上游按这个效率,永远都接收不完。
drymonfidelia
2023-11-22 15:50:28 +08:00
@009694
@pming1
@RedBeanIce 可能是我没描述清楚,是一个请求只能带一条数据,但是三分钟响应的上游也是可以并发的
drymonfidelia
2023-11-22 16:17:00 +08:00
@drymonfidelia 并且并发数量不会导致响应时间叠加
DTCPSS
2023-11-22 16:23:04 +08:00
justFxxk2060
2023-11-22 16:37:13 +08:00
削峰填谷,要啥数据库?
瓶颈是在数据库上,把数据库取消就好了,方案随意选
justFxxk2060
2023-11-22 16:40:19 +08:00
哎,一想着这种问题丢到 ai 里面就能得到正确答案,就觉得程序员前途真是渺茫
justFxxk2060
2023-11-22 16:47:54 +08:00
而多年的研发经验也就是帮着具体分析下,哪儿导致 CPU 占用率最高?
1 、数据库轮询?
2 、频繁写入导致 IO CPU 高?
3 、SemaphoreSlim 和异步 Task 控制并发让线程管理不当,增加 CPU 使用率?

完全没啥用,按照 AI 的答案直接取消数据库 拿 redis 或者第三方消息队列 梭哈就好了
popvlovs
2023-11-22 16:49:14 +08:00
disruptor 有.net 版吧,对应 N producer M consumer 这个模式,M 看样子可以大一点
drymonfidelia
2023-11-22 16:54:33 +08:00
@RedBeanIce
在线率是指我接收客户端数据的这个接口,不能动不动挂掉
经常重启是因为经常要改分类逻辑
xiangyuecn
2023-11-22 16:56:47 +08:00
“开 5 个线程在数据库里扫描这些队列,发现新的任务就开一个异步 Task 上报”

神奇的逻辑。直接给结论:1 个线程足够,立即释放出 80% cpu 。
Kinnice
2023-11-22 17:21:04 +08:00
为啥使用数据库?增加个队列中间件,例如 RabbitMQ ,或直接买队列云服务
1. A 程序为 提供接受客户端数据的接口:接受请求 => 校验 => 入队列
2. B 程序为 从队列取数据,并根据分类发送请求

按这个,B 更新不需要 A 不用停机,且热更新 B 都可以
drymonfidelia
2023-11-22 17:33:32 +08:00
@Kinnice 如果 B 取了数据还没上报就被我关掉了,怎么防止这条数据丢掉?
Kinnice
2023-11-22 17:36:43 +08:00
@drymonfidelia #17 手动 ack 呀
接受到消息 => 处理(推荐的实践是开始发消息了就认为已经处理,而不是发成功后再 ack[太久了],如果没有发成功,可以重新发条消息到 mq 中,再次消费) => ack
hez2010
2023-11-22 19:35:00 +08:00
其实最简单的方法直接弄个 ConcurrentQueue 就行了,没必要从数据库轮询。在存数据库的时候顺便往 ConcurrentQueue 里面塞一份直接用就行了。

```cs
class Worker
{
public static readonly ConcurrentQueue<T> Queue = new();
public static readonly SemaphoreSlim Semaphore = new(...);

async Task ProcessAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await Semaphore.WaitAsync(token);
while (Queue.TryDequeue(out var entry))
{
// ...
}
}
}
}
```
kokutou
2023-11-22 21:58:18 +08:00
在线 debug 看看呢,说不定 CPU 时间是在莫名其妙的地方

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/994142

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX