延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时 1 分钟的消息,消息发送后立即发送给服务器,但是服务器在 1 分钟后才将该消息交给消费者。
这种延时消息有一些什么应用场景呢?比如在电商网站上我们购物的时候,下单后我们没有立即支付,这个时候界面上往往会提醒你如果 xx 分钟还未支付订单将被取消。对于这么一个功能如果不使用延时消息,那我们就需要使用类似定时任务的功能,比如每分钟我们跑一个定时任务对订单表进行扫描,将未支付订单扫出,如果从下单时间到现在已经超过了 45 分钟则将该订单取消。但是定时扫描有一个问题是效率不高,如果订单很多将会严重的影响 db 的性能。如果使用延时消息就没有这样的问题了,只需要发送一条延时 xx 分钟的的延时消息即可,在消息里携带有订单号,xx 分钟后消费者收到该消息检查对应订单状态做出对应处理,这种方式将大大减轻对 db 的压力,实现起来也更优雅。
上面描述的是一种延时时间固定的场景,还有一些是要指定时间执行。比如买了一张一周后北京去东京的机票,那么在乘机时间到来之前可能要发送数次提醒的短信给用户,那么我们也可以在用户下单后发送一条延时消息,延时到乘机时间之前发送。
有了场景,我们首先来分析一下需求:
有了上面的限定,我们来讨论一下延时消息的设计。
延时说白了就是一个定时任务的功能,指定一个未来的时间执行消息投递的任务,时间到了再将消息投递出去。
如果遇到定时任务的场景往往会有这么几个方案来考虑:
上面这三种方式都是基于内存的数据结构,也就是我们得将所有任务都放到内存里,如果用在延时消息上,显然是不现实的,实际上也是没有必要的。如果这个消息是几个小时后需要投递,我们为什么需要现在就将其加载进来一直占着内存呢?看起来我们只需要提前一段时间加载未来某段时间需要投递的消息即可。比如我们将消息按照一个小时为一个段,每次只加载一个段的消息到内存里。其实我们可以用一个很形象的比如来描述这种结构:两层时间轮(hash wheel)。第一层 hash wheel 位于磁盘上,精度较粗,每个小时为 1 个刻度。第二层 hash wheel 位于内存里,只包含第一层 hash wheel 一个刻度的数据,精度为 1 秒。
但是我们怎么去加载这些需要的消息将其组织为第一层 hash wheel 呢?消息接收后存储到一个顺序的 log 文件,消息接收的顺序和消息的延时时间之间是没有任何关系的。比如现在收到了一条消息,是 1 个小时后需要投递,稍后收到一条消息可能是 5 分钟之后投递。我们加载时候是按照延时时间进行加载的,比如我们需要加载未来一个小时需要投递的消息:
比如上图所示,3 seconds 是最近要投递的消息,然后是 5minutes,而排在最头上的是 1 个小时后要投递的。我们不可能每次要预加载的时候都从头扫描一遍,然后将需要的消息加载。
怎么办呢?对于需要快速查找,我们肯定会想到建立索引。那么我们只需要按照我们的预加载的时间段划分索引即可了,比如我们建立 2019021813, 2019021814...这样的索引文件,文件里每一个 entry 就是一个索引,该索引包含以下结构:
index:
schedule time: int64
offset: int64
offset 是指向前面 log 的偏移,而 schedule time 是消息的到期时间。这样我们每次只需要加载一个段(比如 2019021813)的索引到内存就行了,内存中的 hashwheel 根据 schedule time 决定到期时间,到期后根据 offset 读取到消息内容将消息投递出去。
这个存储结构到这里基本上就 ok 了,但是存在一个落地实施的问题(磁盘的空间是有限的):如果一开始收到一条消息是 6 个月之后投递的,后面收到了一些一个小时内投递的,实际上只要消息投递后我们就可以将消息删除了,这样可以大大的节约内存空间,但是因为 log 的头部有一条 6 个月之后的消息,所以我们还不能将该 log 删除掉,也就是至少 6 个月我们不能删除消息,除非我们按照消息来删除,也就是将 6 个月后的消息保留下来,而一个小时内已经投递了的消息删除掉(一种 compact 机制),但是这种实现就变得很复杂。
其实换个方式就简单了,在前面我们按照每个时间段建立索引文件,那么如果我们不仅仅建立索引呢?也就是索引文件里不仅仅是索引,而是包括完整的消息:消息收到后先进入一个按照接收顺序的 log(qmq 里称之为 message log),然后回放该 log,按照 log 里每条消息的投递时间将消息放到对应的时间段上(qmq 里称之为 schedule log),这样只要回放完成后 message log 里的消息就可以删除了,message log 只需要保留很少的内容,而 schedule log 是按照投递时间段来组织的,已经投递过的时间段也可以立即删除了。通过这种变化我们顺利的解决了磁盘占用问题,另外还有一个副产品:读写分离。这种方式我们在如何用不到两千块大幅度提升 QMQ 性能里已经有过介绍,我们可以将延时消息里的 message log 放到小容量高性能的 SSD 里,提高消息发送的吞吐量和延时,而将 schedule log 放到大容量低成本的 HDD 里,可以支撑时间更久的延时消息(下图即延时消息的存储结构):
在这里还有一些具体实现细节需要处理。虽然我们按照每个时间单位重新组织了消息(schedule log),但是在该时间段内的消息并不是按照投递时间排序的。比如每个小时为一个时间段,那么可能第 59 分钟的消息排在最前面,而几秒内需要投递的排在最后面,那如果某个时间段内的消息正在投递时应用突然挂掉了,那么再次恢复的时候我们并不能准确的知道消息投递到哪儿了。所以我们增加了一个 dispatch log,dispatch log 在消息投递完成后写入,dispatch log 里每一个 entry 记录的是 schedule log 里的 offset,表示该 offset 的消息已经投递,当应用重启后我们会对比 schedule log 和 dispatch log,将未投递的消息找出来重新加载投递,dispatch log 相当于一个位图数据结构。
在我们决定加载某个时间段消息时(正在加载的时间段称之为 current loading segment),我们首先会取得该时间段文件的最大 offset,然后加载只会加载这个 offset 范围内的消息(qmq 内称之为 loading offset),而加载过程中如果又来了该时间段内消息,那这个消息的 offset 也是>loading offset:
if( message.offset in current loading segment && message.offset > loading offset){
add to memory hash wheel
}
实际上我们并不会将 schedule log 里完整的消息加载到内存,只会加载索引到内存,根据前面的介绍,每个索引是 16 个字节(实际大小可以参照代码,略有出入)。假设我们使用 1G 内存加载一个小时索引的话,则可以装载 1G/16B = (1024M * 1024K * 1024B)/(16B) = 67108864 条消息索引。则每秒 qps 可以达到 18641(67108864 / 60 / 60)。如果我们想每秒达到 10 万 qps,每个小时一个刻度则需要 5493MB,如果觉得内存占用过高,则可以相应的缩小时间段大小,比如 10 分钟一个时间段,则 10 万 qps 只需要占用 915MB 内存。通过计算可知这种设计方式还是在合理的范围内的。
qmq 示例代码-github,本人在基础上做了注释!