大量查询数据任务,通过 kafka 缓冲后入库,如何解决大任务阻塞小任务的情况。

2022-03-25 11:56:37 +08:00
 whats
RT

每个任务的流程为:查询服务( N 个实例)->kafka 集群( 2 个 topic )->入库服务( N 个实例)->数据库

目前情况:
1. 查询服务从不同来源获取数据,任务数量比较多,1 天任务量可以到 10w 级别
2. 任务查询的数据量差异比较大,有 1 亿条的,少的也就几十条,查询结束前无法提前知道数据量。
3. 单个任务的查询是分批进行的,比如每查 1w 条提交一次给 kafka 的 2 个 topic (少于 1w 条写 topic1,等于 1w 条写 topic2 )。
4. 入库服务消费 2 个 topic ,写入结果库。

遇到的问题:
1. 当有大任务写了大量数据到 kafka 后,一些中小型任务被阻塞,需等大任务入库完成才能入库。

想请教各位大神这类场景有什么策略解决大任务阻塞小任务入库的问题,或者有没其他 mq 替换 kafka ,支持创建大量 topic ,每个任务对应一个 topic ,且不影响 mq 性能。
2531 次点击
所在节点    程序员
20 条回复
q1angch0u
2022-03-25 11:57:52 +08:00
优先级队列
小任务->高优先级
大任务->低优先级
q1angch0u
2022-03-25 11:58:41 +08:00
无视我
q1angch0u
2022-03-25 12:00:38 +08:00
不对,我没理解
[想请教各位大神这类场景有什么策略解决大任务阻塞小任务入库的问题,或者有没其他 mq 替换 kafka ,支持创建大量 topic ,每个任务对应一个 topic ,且不影响 mq 性能。]
每个任务对应一个 topic 的目的是什么呢?
whats
2022-03-25 12:05:28 +08:00
@q1angch0u 这样任务之间的影响相对较少 每个任务在查询环节创建 taskid 为名称的 topic ,并通知入库服务去消费该 topic , 任务完成后可以删除该 topic
q1angch0u
2022-03-25 12:10:59 +08:00
@whats kafka 有这么用的吗……
lxz6597863
2022-03-25 12:28:24 +08:00
for {
task = get_task_form_small_topic()
if task {
do(task)
continue
}
task = get_task_form_big_topic()
do(task)
}

你已经分大小任务 topic 了,这样不行吗?
gejigeji
2022-03-25 14:11:01 +08:00
小的优先级高的新建到不同 topic , 让分一些入库服务专门消费这些 topic
season8
2022-03-25 14:29:15 +08:00
你这个查询做两件事:
1. 查到不同源的数据入库。
2. 返回数据条数。

这样用 kafka 确实有点奇怪

1. 不能直接计算条数,然后写 kafka 么?
2. 能不能实时从数据库取出条数,写了几条就能拿到几条。
3. 数据预查预写
sss495088732
2022-03-25 14:35:25 +08:00
加 topic 成本最低.
pengtdyd
2022-03-25 14:55:28 +08:00
为什么会阻塞?大概率是大批量写入占用了磁盘 iIO

仅供参考:
1.挂载多个磁盘,然后配置多个 data 目录
2.topic 分区

ps: 1.我不理解什么叫”并通知入库服务去消费该 topic “这句话,topic 有数据就自动消费就好了 offset 会自动记录,为啥要通知??
2.奇葩场景、奇葩用法
dongtingyue
2022-03-25 17:08:55 +08:00
槽点太多。。。。
大任务大部分都是大于 1w 的都是到 tp2 ,小任务到 tp1 ,消费从 tp1 和 tp2 消费怎么可能会被大任务阻塞?
SbloodyS
2022-03-25 17:10:53 +08:00
1 亿的数据对于 kafka 来说挺小的吧....10 亿一天都没多少,没理解为什么要这么用
paradoxs
2022-03-25 17:18:37 +08:00
我很好奇,你现在这样设计,那用户多久才能得到查询结果?

到底是什么业务场景?
liyunlong41
2022-03-25 17:20:12 +08:00
给所有任务的入库设置超时时间,超时的任务 cancel 掉,超时的视为大任务,大任务单独放到重试队列里,这样不会阻塞中小任务。
java253738191
2022-03-25 20:05:20 +08:00
rabbitmq 不是支持优先级队列吗?
Jooooooooo
2022-03-25 21:58:46 +08:00
分开消费呗.
leafre
2022-03-25 22:21:23 +08:00
分批生产到两个 topic ,不同消费者组消费,怎么会阻塞呢,我想到的原因就 broker 性能实在太差,或者根本没形成 Cluster
jhdxr
2022-03-25 22:48:42 +08:00
『需等大任务入库完成才能入库』

你这儿的入库指的是你 kafaka 后面的那个消费者(入库服务)吗?那既然这样为何不增加一些消费者的实例?
zmal
2022-03-25 23:25:10 +08:00
1. "1w 条提交给 topic1,小于 1w 条提交给 topic2",分两个 topic 是有业务含义吗?如果没有的话这个逻辑是很有问题的。
2. "1w 条写 topic"是在 kafka 里写了 1w 条,还是写了一条,里边放了 1w 条数据?如果是后者,这样做是非常有问题的。
3. 你的 kafka 集群有多大?该 topic 分区有多少?听起来好像只有一个分区?有分区逻辑吗?比如你完全可以根据数据量写到不同的分区,而不是创建两个 topic 。
4.至于你说的“每个任务对应一个 topic ,用完删除”的想法更是有点异想天开。首先 kafka 已经是吞吐量最高的 mq ,且吞吐量不受存储数据量影响。其次,“用完删除”没有意义,高性能的分布式数据库 mq 等基本都是标记删除,而且高吞吐 mq 的数据都是放在磁盘的,删除 topic 只是个手动腾出磁盘空间的操作,有什么意义?

总之你遇到的问题不是 kafka 的问题,也不是换个 mq 可以解决的问题。看看 kafka 的性能瓶颈在哪里,可能是分区、网络 io 、内存之类的。
joesonw
2022-03-25 23:47:27 +08:00
请使用消息队列。kafka 适合有序消息流处理。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/842819

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX