求助 Java 大量任务分布式处理的问题

2022-04-26 12:30:59 +08:00
 yesterdaysun

问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的话, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.

现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:

  1. 一种是分布式任务队列, 看到一个 Celery 好像是这种, 但是这个 python 的, 我想要 Java 的, 结果没找到
  2. 一种是任务调度框架, quartz, xxljob 这种, 感觉我想要的更靠近这种, 但是又有点迷惑, 比如感觉我这种需求适合"分片广播"这种任务, 比如我把 1000 个用户的任务分片到 3 台机器, 但是然后每台机器上的任务为每个用户再单独为他名下的 1 万条记录自己做线程池请求? 或者我把任务拆到单个小记录的级别, 那岂不是得成千上万的 trigger, 然后任务调度又一般是一个主 job, 然后传参数这种, 那比如我要确保一个时间只有一个用户的任务在跑, 怎么做这个限制, 全要自己在任务中处理吗

所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑

我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教

4813 次点击
所在节点    Java
31 条回复
lmshl
2022-04-26 18:28:37 +08:00
@yesterdaysun 以上技术方案中,综合代码量和开发难度来看,从易到难依次应该是
纤程 >> Akka Stream > nio-pool > xxjob/scheduler > 动态线程池屎上雕花 >> akka cluster sharding >> akka cluster without sharding

纤程是真的简单,你这需求 20-50 行左右就完事了,不就是个
flow = post(...) >> (sleep(1.minutes) *> check(xxx)).retryWhile(isCompleted) >> retrieve()
然后 tasks.foreachPar(<你想开多大并行>)(flow)
的事
outoftimeerror
2022-04-26 18:33:29 +08:00
我也写 java ,不过你这个需求让我选型的话,我会用 golang (goroutine+chan)+ redis
XhstormR02
2022-04-26 19:24:21 +08:00
@lmshl java 的纤程 Quasar ,最近一次更新是 2018 年,都好多年没更新了 https://github.com/puniverse/quasar ,倒不如用 kotlin 的 coroutines
https://github.com/Kotlin/kotlinx.coroutines/
dddd1919
2022-04-26 21:27:40 +08:00
显然是该上 MQ 了,把用户放到队列,由消费端去挨个处理用户任务,如果单个用户跑的话配一个消费任务就够了

强业务需求建议 RabbitMQ/RocketMQ
lmshl
2022-04-26 21:41:39 +08:00
@XhstormR02 反正我说的也不是 Java 🐶
其实上面写的是 Scala 伪代码 🐶
档燃,Kotlin 也不错,起码有 suspend/await 可以用,不像 IO Monad 要切换编程思维
mind3x
2022-04-27 02:40:39 +08:00
建议看看 Uber 的开源框架 Cadence ,支持 Go 和 Java 。
https://cadenceworkflow.io

上手会有一点门槛。
ymmud
2022-04-27 12:02:07 +08:00
@lmshl 看着有点像 zio
lmshl
2022-04-27 20:37:23 +08:00
@ymmud 兄弟慧眼👍
yhvictor
2022-04-27 23:36:07 +08:00
协程应该满好弄的。
nio 有点难写。

但我觉得吧,楼主这工作分两部分,第一部分是网络 io 等待准备好,第二部分是处理数据。
第一部分是 io 密集,第二部分是 cpu 密集。
所以如果拆成两部分,第一步就可以线程开满,直到数据准备好,放入一个 queue 。
第二步开线程约等于或小于 cpu 核心数,从 queue 中读准备好的数据源并处理。
齐活。
byte10
2022-04-28 09:35:05 +08:00
@polarbear007 是的,一个 NIO 就解决啦,花里胡哨的,想一些错误的方案。

首先 IO 密集型,线程开到 1000 个都不是问题,线程在 IO 的时候不占用 cpu 。当然可能同时响应时就会出现 cpu 拉满,所以 cpu 使用率就是锯齿形的,不好分析瓶颈。

你这个核心问题就是 IO 时间不确定,无法确定最大线程数。你可以看我的那个教程,https://www.bilibili.com/video/BV1FS4y1o7QB , NIO 如何无视 IO 时间,解决线程池大小的问题。你千万不要搞分布式,分布式是单机 cpu 性能出现瓶颈才干的事情,你这个场景一个树莓派 4B 就能完成。一定要切记,这些很基本很基本的问题,不要把事情想复杂了,在这一点犯错的人太多了。

至于多个异步转同步问题,countdownlatch 和 cyclicbarrier 都能很好解决。

找到最核心的问题,解决核心问题,加油。
wolfie
2022-04-28 15:37:50 +08:00
做过类似功能,自己实现了一个,参考了 xxl-job 的表结构。

# some_table
- id
- status ( running 、succeed 、failed )

# some_table_job (频繁扫描表)
- some_table_id
- is_running
- next_run_time (索引字段)
- last_run_time (索引字段)(几分钟一扫描,防止异常结束,长时间未完成)
- version (版本号,乐观锁)

# some_table_job_log
- some_table_id
- some_table_job_id
- result ( succeed 、failed )

1. 新增 some_table ,同时新增 some_table_job
2. 定时任务扫描 some_table_job ,拉取任务数据
3. 任务执行完
- 成功:写入 some_table_job_log ,删除 some_table_job ,回写 some_table 状态。
- 失败:写入 some_table_job_log ,计算 some_table_job 下一次执行时间。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/849331

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX