RocketMQ 消费写入 MySQL 问题

2023-04-17 10:21:01 +08:00
 chenfang

公司需求是把项目中产生的数据,实时入库到 MySQL,这些数据分别属于不同的 table.

现在的方案是把产生的数据通过 tomcat -> RocketMQ 的一个 topic 中,然后启用一个 MQ 消费者组合并消息(如果不合并入库次数太多也会慢),然后 batch 入库,这个方案单表没有任何问题.

如果设计到多个表入库,因为 mq 消费是顺序读取 topic 中的消息,也就导致如果一个表需要入库的数据量大,那么入库时间就会长,这会导致整体消费变慢,从而导致实时表不实时的问题.

这就是我现在遇到的问题

  1. 入库 MySQL 的时间长,解决时间长的问题(这个我目前想不到解决办法)
  2. 如果解决不了入库时间长的问题,那么就让他不影响其他表入库

我自己想过解决方案,针对问题 2 的

方案 1

从一个消费者组 改为 按表名创建消费者组,但是可用性不高

按表名创建消费者组,使用过一段时间,因为表比较多,然后在一个 jvm(如果多个程序分别启动不同的消费者组也不好管理)中启动那么多消费者组,首先内存会占用很多(应该是每个消费者组都会缓存一定数量的消息),也就是内存的大小跟实时表的数量是等比上涨的.其次我查了一下说是消费者组数量也不是可以无限加的,目前我们是 110 多张表,也比较多了.考虑到之后还会加表,这个方案试行了一段时间就废掉了.

或者这个方案有没有优化空间?

方案 2

把入库时间长的表单独分一个消费者组,可用性比第一个还低

经过实践,不晓得哪个时间点,表里数据生产就会增多,所以这样也会导致可能突然就延迟了....很不可控

然后就是大家有什么思路么?万分感谢!

3245 次点击
所在节点    程序员
28 条回复
Seulgi
2023-04-17 10:30:57 +08:00
一个队列,消费时按表丢缓存,异步任务定时 10 秒,30 秒自己看能接受的延迟,记得设置缓存大小触发强制写。
举个例子,table_a ,table_b ,异步定时写任务 10 秒触发一次,表缓存强制 50mb 时触发写数据库操作。注意锁问题
chenfang
2023-04-17 10:36:12 +08:00
@Seulgi 这就是现在正在跑的版本,也是用了 jvm 的内存做缓存这种机制,还有强制触发写操作,但是还是不成,首先缓存强制触发不可以无限触发,比如同时入一个表的线程最多是 3 个,那么最后还是需要等待入库完成,从 MQ 消费读取消息也会触发等待...
Seulgi
2023-04-17 10:38:56 +08:00
表过多,建议将表名设置为 tag ,按 tag 切分消费组,多部署消费者提高消费速度,实时性较高的表,可以单独一个 tag 一个消费组。
举例:比如 table_a,table_b,table_c ,table_a 要求实时,部署 4 个 pod 只消费 tag:table_a 的数据,配置定时异步写任务为 1 秒或者为实时写。table_b,table_c 不要求实时,部署 4 个 pod 只消费 tag:table_b|table_c ,配置定时异步写任务为 30 秒
Seulgi
2023-04-17 10:39:59 +08:00
@chenfang 强制触发有等待,那说明你们的消费速度>写速度。要做的时将消费均摊。也就是 pod 要多部署。
AS4694lAS4808
2023-04-17 11:02:57 +08:00
有类似的场景,不过是 AWS 云上。
目前是 API (tomcat) -> Kinesis data stream (RocketMQ) -> Kinesis Firehose (Flink/Fluentd) -> S3 (MinIO)
-> OpenSearch (ES)
每日定时把 S3 的数据 Load 到 Redshift (Mysql)里,删除 ES 7 日以上 index

查询的时候 7 日以下 -> ES
7 日以上 -> Redshift (MYSQL 分库分表)

我们业务一开始也是读写一起走库的,但是显然只能支撑小数据量,而且后面查询也多了,就重构了一遍。
8355
2023-04-17 11:17:46 +08:00
问题 1 为什么不直接同步入库而采用异步入库的形式?就算使用异步入库也不用很多表或者说达到 110 张表都异步去写吧。
问题 2 现在每天写入量大概是多少?入库 MySQL 的时间长大概是多长 每条 sql 写入多少行?慢写入有多少个索引多少个字段?
chenfang
2023-04-17 11:19:17 +08:00
@AS4694lAS4808 很多项目都是读数据库表里的表,改成 ES 很难...费时费力估计老板不会同意去搞
chenfang
2023-04-17 11:27:57 +08:00
@8355
答案 1 单个消息里存不了太多数据,单次入库的时间加起来,是比批量入库的时间长的,还是跟表数据量太大有关系
答案 2 最大的表迁移到了 Doris 一次入库 50-80 万条左右,设置的是间隔 50s 一次强制写入,入库时间现在是 40-50s

慢写入有多少个索引多少个字段? 这个我不晓得..
8355
2023-04-17 11:34:40 +08:00
不管在方案 1 还是方案 2 都无法满足你的原始需求 项目中产生的数据,实时入库到 MySQL
当执行 update 操作时 你前台返回操作成功 但后台并不一定能绝对执行成功
当消息堆积时现开消费者时来不及的,会出现执行增删改操作后查询还是原来的值

项目复杂度会指数型上升 你需要特别小心的处理缓存数据和刷新的时机 为了这个方案你的代码量起码要翻一倍
所以你这两个方案都是及其糟糕的

主力削峰业务进队列 99%的业务应该同步读写 最简单的就是最好的也是最不容易出问题的
8355
2023-04-17 11:40:51 +08:00
@chenfang #8 那我的理解你的核心最大的问题是在写库时间 而不是为了解决这个问题再上面增加复杂度
哪怕你的队列消费的再快你的写库时间还是会很长,还会牵扯到刷盘策略问题,失败异常数据全丢问题更大。
pkoukk
2023-04-17 11:51:47 +08:00
如果非要把数据直接写入 MySQL ,那你这个场景的瓶颈在 MySQL 上啊
几十万的数据再 mysql 配置再高也得好几秒吧,就算你分 topic 了,其它写入也会被阻塞住
分库吧,按消息的某个 ID 字段分 HASH ,提高下游处理速度
liprais
2023-04-17 12:34:25 +08:00
搞个 flink 完事
dlmy
2023-04-17 12:37:46 +08:00
这个问题的核心是 “MQ 中消息消费速度远大于入库速度,并且需要实时入库到 MySQL”,如果一定要坚持 “实时入库”,那么不管你用什么方式解决,系统的复杂度相对都会变很高,也可能会带来新的问题,如果去掉 “实时” 这两个字,单就入库来说,会有很多解决方案。

标记一下,蹲大佬的 trade-off 方案
standchan
2023-04-17 13:22:37 +08:00
这个实时性有点麻烦啊,一个是很快的消息队列,一个是必须要进行 io 的数据库。要不试试 clickhouse ?但是换数据库要大改更麻烦
lolizeppelin
2023-04-17 13:30:07 +08:00
搞笑啊 所有 mq 只要多个消费者都可能出现写入顺序问题 还要实时
拍啥脑门写方案呀
hhjswf
2023-04-17 14:03:59 +08:00
谁做的选型啊,mq 本来就是拿来做异步,要求实时不是搞笑?
fkdog
2023-04-17 14:15:07 +08:00
入库的数据除了 insert 是不是也包括 update ?
如果是 update 是不是需要考虑并行消费顺序不一致脏写问题?
kafka 开多个 partition 不同表写入不同的 partition ,或者干脆不同表设计不同 topic 不知道能不能满足你的需求。
没用过 rocket ,不过应该有对应概念。
Red998
2023-04-17 14:25:15 +08:00
看业务对实时怎么看待了、技术角度都不是实时、只是延迟时间长短问题。 或者可以使用 binlog 方式去监听然后消费 MQ
、canal 了解下。
urnoob
2023-04-17 15:03:46 +08:00
OP 需要澄清下 实时 这个需求.是真的实时还是可以接受一定范围内的延迟.
真实时,那为啥不直接入库,就想常见的 CRUD 那样,何必放个 MQ
可延迟,那上面已经提供了一部分方案了.
对于方案 1 的优化
可采用高低搭配,量大的表部署更多消费者,硬件配置也更好.(OP 方案 2)
还取决于具体业务.
比如 tomcat 过来的数据进同一张表,数据之间是否有关联.
有关联 比如同一个设备位置更新. 但是设备之间无关联,那就按设备唯一标识符区分,确保对于同一设备入库先后顺序.
如果有关联可合并,那就在写入 MQ 前做一定的合并操作减少总量.
没关联 那就直(批量)入库.

很奇怪 OP 的方案 2
不晓得哪个时间点,表里数据生产就会增多
数据已经入库 但凡有个创建时间都应该知道什么时候变多,怎么会不知道呢.我觉得是需要具体深入挖掘的.


另外还有一种优化,就是 tomcat 那作为生产者是可以知道某几个表数据量增长的 它可以
临时增加消费者
用 MQ 通知所有消费者,对消费策略做一定更改.不分表的情况下将量多的表做批量写入 或者搭配方案 1,对这部分数据再写入 MQ,然后再按表(Tag)进行消费 这都能减小其他表的写入延迟影响..

但没有具体业务场景也只能说的泛泛

如果大到 Mysql 性能跟不上,那就肯定要做扩容了.
buddyy
2023-04-17 15:31:15 +08:00
建议你看一下 MySQL 所在机器的磁盘 IO 情况,是否出现了 IO 饱和的情况。如果 IO 饱和你必须得分库了。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/933067

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX