V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
qviqvi
V2EX  ›  数据库

两个线程一起从表中取数据,表和 sql 如何设计呢?

  •  
  •   qviqvi · 141 天前 · 2027 次点击
    这是一个创建于 141 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一个任务表,两个线程同时每次取一条数据执行业务逻辑,然后把这条数据标记为完成或暂时放弃,暂时放弃的后面会被再次取出

    要求同一条数据不能被多个线程同时取出

    求优秀的设计方案

    32 条回复    2024-06-21 13:39:17 +08:00
    rekulas
        1
    rekulas  
       141 天前
    取数据的时候加行锁就行了
    tclm
        2
    tclm  
       141 天前
    主键是数值吗?确定只有两个线程吗?
    如果都是的话,一个线程只处理单数,一个线程只处理双数。
    Vegetable
        4
    Vegetable  
       141 天前
    1. 行内有字段能识别当前正在处理中
    2. 取的时候加锁
    gitdoit
        5
    gitdoit  
       141 天前
    只有一个线程负责交互任务表, 其他线程再和这个线程交互
    coder001
        6
    coder001  
       141 天前
    搞个并发队列,不管是进程内还是进程外,工作线程去队列取工作项
    主线程负责从数据库读出东西放进队列,工作线程拿到任务就先标记一下立即更新再开始执行,遇到放弃(需要再次执行)的状况主线程里再次拿出投进队列
    test0x01
        7
    test0x01  
       141 天前 via Android
    一个单数 ID 一个双数 ID
    MoYi123
        8
    MoYi123  
       141 天前   ❤️ 2
    update table set status = 'work' where id = 1 and stats = 'ready'

    然后看 update 的返回值是 0 还是 1 即可. 和 redis 的 setnx 差不多的做法.
    rainxt
        9
    rainxt  
       141 天前   ❤️ 1
    正在做类似的需求,额外用了 redis 做消息队列,1 个任务往消息队列里写,几十个业务进程从消息队列里拉取任务,处理完回写数据库标记完成,消息队列里空的时候再去查数据库里还有没有要处理的往里加
    Karte
        10
    Karte  
       141 天前
    1. 创建一个带有并发控制所的队列.
    2. 每个线程从该队列中获取数据.
    3. 在获取数据时, 如果队列为空, 则由当前线程请求数据库, 批量读取数据丢入队列.
    4. 数据暂时放弃时, 将数据丢入队列底部.

    这样就好了. 在读取数据时通过锁做到数据获取无冲突, 而且不会有重复的数据从数据库中读取; 另一方面也降低了 IO 请求.

    这个方案的缺点在获取数据时会有锁的存在, 如果数据处理比较快, 可能会遇到其他线程在等待其中一个线程读取数据库的返回.
    8355
        11
    8355  
       141 天前
    1.最简单的单进程扫表推消息队列
    2.update task set status=1 (运行中) where id = xx 根据影响行数判断是否哪个线程可以抢占执行中状态。
    8355
        12
    8355  
       141 天前
    @8355 同 8l 少了待执行状态 一个意思
    mifly
        13
    mifly  
       141 天前 via Android
    可以利用数据库的 update 语句的锁来实现
    生成唯一的 Lockkey
    Update sometable ser lockkey=lockkey where ... Limit 10
    Select ... From sometable where lockkey=lockkey
    Karte
        14
    Karte  
       141 天前
    方案二:

    1. 创建一个对象, 对象记录一个查询位置, 查询数量.
    2. 每个线程通过加锁读取该对象获取位置和数量, 并在获取后更新当前查询位置. 释放锁
    3. 通过之前获取到的查询位置, 查询数量从数据库获取一段范围的数据.
    4. 将数据丢入线程本地队列, 依次处理.
    5. 对暂时放弃的数据 id 存入 步骤一 对象中. 采用 CopyOnWrite 方法更新对象值.

    这个方案将查询 IO 放入到了每个线程, 线程只需要在获取当前处理位置时加锁就好了. 这样锁的时间十分短暂, 几乎可以做到 "无锁" 的情况. 但是需要注意线程数量, 查询数量参数; 尽可能的将线程数拉小, 查询数量拉大, 这样线程可以更专注处理数据, 而不是一直等待 IO 返回.
    Karte
        15
    Karte  
       141 天前
    方案三:
    直接用现成的消息队列, 把要处理的消息丢到队列里. 消费者直接多线程从消息队列中获取数据处理. 这个开发方案是可以做到代码量最小.
    CEBBCAT
        16
    CEBBCAT  
       141 天前
    加锁哪有串行得劲,模仿 Redis 一致性哈希,加一列 int 给任务打标签,你干一三五,我干二四六,加一个 worker 捡漏。或者用 MQ 传递 ID ,让已经成熟的 MQ 来干这事


    *然后发现让楼上抢答了,气😡!
    nothingistrue
        17
    nothingistrue  
       141 天前   ❤️ 1
    楼主你要得,应该不只是「同一条数据不能被多个线程同时取出」,而且还要是,「一条数据被一个线程处理时,其他线程顺序取下一条数据处理」

    @rekulas #1
    @Vegetable #4
    取数逻辑是未处理数据中的第一个,行锁管不了(意味着非序列级别的事务也管不了),表锁和序列级别事务又管得太多了——写回前都会阻塞另一个线程取数,这样的话无论多少线程都跟单线程没啥区别。

    @MoYi123 #8 你这个其实就是变相的表锁

    我这没有方案,不过可以确定单靠数据库是解决不了了。上面两个用队列的方案,看起来就星了。
    yjhatfdu2
        18
    yjhatfdu2  
       141 天前   ❤️ 2
    begin;
    select * from tbl for update skip locked limit 1;
    # 处理逻辑
    update from tbl where xxx=xxx set processed=1;
    commit;
    yjhatfdu2
        19
    yjhatfdu2  
       141 天前
    现在的程序员都不会开事务了吗
    Vegetable
        20
    Vegetable  
       141 天前
    @nothingistrue 锁是为了修改状态标记占坑,不是处理任务的时候锁着。
    qviqvi
        21
    qviqvi  
    OP
       141 天前
    @yjhatfdu2 感觉这个是正解
    rqxiao
        22
    rqxiao  
       140 天前
    @yjhatfdu2 第一次知道
    csrocks
        23
    csrocks  
       140 天前
    number of task = N

    task_0: select * from t where mod(id_hash, N)=0
    ...
    task_N: select * from t where mod(id_hash, N)=N-1
    nothingistrue
        24
    nothingistrue  
       140 天前
    @Vegetable #20 看好取数的查询条件,不是 select * from t by id ,而是 select * from t where stauts= 未处理(以及未锁定) 中的第一条。两个线程之间的共享对象,不是一行数据,而是所有未处理数据,你要加锁或者占坑,也只能这么占。加锁之后的结果就是,多个线程只能轮流处理,跟单线程一个效果,无法并行。
    Vegetable
        25
    Vegetable  
       140 天前
    @nothingistrue 我没看明白你说的是什么,锁只是确保状态只会被一个线程修改,和 MoYi123 说的判断影响行数的方案本质是一样的,锁的持续时间也就几毫秒而已,哪里和单线程一个效果
    nothingistrue
        26
    nothingistrue  
       140 天前
    @yjhatfdu2 #18
    https://stackoverflow.com/questions/53288584/select-for-update-skip-locked-in-repetable-read-transactions
    看下 skip locked 的效果,可以重复加锁,但是事务提交的时候要判定数据有没有被更改过,如果已经更改,那么本事务要失败。这就是个乐观锁,先提交的成功,后提交的失败,使用场景是并发修改的机率不高的场景。如果你在多线程并发场景中用乐观锁,那没跑几步就会只剩一个线程活着,其他线程全部出错终止(加了出错之后重启机制,效果会更差,绝大部分性能将被浪费在失败重启上)。

    事务不是一句「开事务」就完事大吉的。什么时候开事务,开什么样的事务,都是有考究的。最常见的错误,就是认为开了事务就不怕并发数据冲突了。事务的隔离性是分级别的,序列化级别才能保证完全不出现并发数据冲突——但同时也没并发了。常用的隔离级别是可重复读,只在并发数据是确定的单行/多行数据的时候才能保证无并发冲突,当并发数据是表,或者表中不确定的数据时,还是要加锁处理并发冲突的。

    而怎么加锁,也是有考究的。楼主的场景是没法加锁的,因为它的并发数据是「 stauts = 未处理」的表,不是「主键 = xxx 」的行。加常规悲观锁就让线程排队,等同于失去并发。加乐观锁,就是开玩笑。

    @qviqvi #21 不要只找简单答案,容易错。Karte 那三个方案才是正确的。
    nothingistrue
        27
    nothingistrue  
       140 天前
    @Vegetable #25
    update table set status = 'work' where id = 1 and stats = 'ready'
    id = 1 怎么来的:是 select id from table where stauts= 未处理 limit 1 得到的
    那要是 select 语句跟 update 语句之间,其他线程已经提前做了 update 呢:这个 update 会返回 0 ,本线程处理失败
    接着呢:如果你当失败处理,那么这个线程没了;如果你重新 select ,那么大部分时间要消耗在 select -> update return 0 -> select 的死循环中了。

    我对 @MoYi123 第一个回复确实是错了,不是表锁,而是乐观锁。但乐观锁不解决问题,详见我上面对 yjhatfdu2 的回复。

    楼主这个场景要加锁,你只能对 「 where stauts= 未处理 」,或者「全表」加锁,加了之后多线程从并发变排队,形同单线程。
    glacer
        28
    glacer  
       140 天前
    id mod 线程数
    MoYi123
        29
    MoYi123  
       140 天前
    @nothingistrue
    id=1 怎么来的? 当然是 select * from t where stats = 'ready' limit 10 查出来的啊,
    每个 worker 先拉一个 todo list, 然后一个个去领任务, 成功领到了就开始干, 这有啥问题啊,

    难道你用消息队列就能凭空变出来这个 id 了? 不也是这样查的
    linyinma
        30
    linyinma  
       140 天前
    明明在应用层可以处理的,偏偏要扯到数据库,: 一个线程批量取,多个线程消费, 中间加个锁消费完了再去取.....
    nothingistrue
        31
    nothingistrue  
       140 天前
    @MoYi123 #29 消息队列读写分离了,可以单独只对读那一瞬间加锁。10 楼说得明明白白。
    MoYi123
        32
    MoYi123  
       140 天前
    @nothingistrue 回答一个问题, 往队列里塞数据的进程是不是单点? 或者你要怎么加锁?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1706 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 16:48 · PVG 00:48 · LAX 08:48 · JFK 11:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.