redis 队列推送消息的疑问🤔️

2019-05-09 15:58:16 +08:00
 uoddsa

有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。

6864 次点击
所在节点    PHP
29 条回复
coffeSlider
2019-05-09 16:10:31 +08:00
取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。
rpdict
2019-05-09 16:44:33 +08:00
用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了
yxjn
2019-05-09 16:52:25 +08:00
redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。
julyclyde
2019-05-09 16:56:04 +08:00
@rpdict zset 很容易慢的
iyaozhen
2019-05-09 16:56:10 +08:00
redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费

用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。
reus
2019-05-09 17:04:40 +08:00
如果 redis 崩了呢?你怎么知道那些发过哪些没发过?
rpdict
2019-05-09 17:12:45 +08:00
@julyclyde 有序集合的好处就是读取的时候读第一个,如果时间没到就不用接着读了,节约的时间在这里
julyclyde
2019-05-09 17:25:39 +08:00
@rpdict 我是指 zset 的排序速度慢。尤其是没按顺序插入的时候
uoddsa
2019-05-09 17:32:33 +08:00
@iyaozhen 问题就在这里 怎么取队列里面的消息 while(true) ?还是有其他好的方法。
yxjn
2019-05-09 17:38:25 +08:00
@uoddsa blpop 看看能不能满足你的需求
lestat
2019-05-09 17:42:52 +08:00
@rpdict laravel 里面的延时队列好像就是这么设计的
rpdict
2019-05-09 17:54:58 +08:00
@uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快
rpdict
2019-05-09 17:57:26 +08:00
@lestat 我是参考了有赞的延时队列,感觉大家做法都差不多?感谢回复,我去看看 laravel 怎么实现的
Evilk
2019-05-09 18:29:37 +08:00
@yxjn 同意,redis 还是适合缓存,需要队列的话,还是选择专业的吧,比如 rabbitMQ
strive
2019-05-09 18:39:13 +08:00
可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了
brickyang
2019-05-09 18:44:47 +08:00
lovedebug
2019-05-09 18:58:27 +08:00
这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。
ericliu001
2019-05-09 19:08:25 +08:00
lpoprpush 看下这个命令
runnerlee
2019-05-09 19:35:27 +08:00
laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).

消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现:

https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54
```
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
redis.call('lpop', KEYS[3])
end
return {job, reserved}
```

而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中.

https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167

同样也是使用 lua 进行操作
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105
```
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i+99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end

return val
```

同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息.

https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84

在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务

https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111

所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费.
iyaozhen
2019-05-09 19:40:58 +08:00
@uoddsa 死循环就行了,做好异常处理。后台运行呗

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/562561

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX