laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).
消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现:
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54```
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
redis.call('lpop', KEYS[3])
end
return {job, reserved}
```
而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中.
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167同样也是使用 lua 进行操作
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105```
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
-- Push a notification for every job that was migrated...
for j = i, math.min(i+99, #val) do
redis.call('rpush', KEYS[3], 1)
end
end
end
return val
```
同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息.
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务
https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费.