Redis 6 新 Feature 实现原理:多线程 IO

2020-10-02 12:43:01 +08:00
 RedisMasterNode

Introduction

Redis 从 6.0 版本开始引入了 Threaded I/O,目的是为了提升执行命令前后的网络 I/O 性能。本文会先从 Redis 的主流程开始分析,讲解网络 I/O 发生在哪里,以及现有的网络 I/O 模型,然后介绍 Threaded I/O 的新模型、实现以及生效场景,最后会进行场景测试,对比 Threaded I/O 关闭与开启,以及启用 Threaded I/O 与在单实例上搭建集群的性能差异。如果你已经了解过 Redis 的循环流程,可以直接跳至Threaded I/O 相关的部分;如果你只关心新功能的实际提升,可以跳至性能测试部分查看。

Redis 是如何运行的

事件循环

main

Redis 的入口位于 server.c 下,main()方法流程如图所示。 main()方法中 Redis 首先需要做的是初始化各种库以及服务配置。具体举例:

初始化结束后,开始读取用户的启动参数,和大多数配置加载过程类似,Redis 也通过字符串匹配等分析用户输入的argcargv[],这个过程中可能会发生:

解析完参数之后,执行loadServerConfig()读取配置文件并与命令行参数 options 的内容进行合并,组成一个config变量,并且逐个将 name 和 value 设置进 configs 列表中。对于每个 config,有对应的 switch-case 的代码,例如对于loadmodule,会执行queueLoadModule()方法,以完成真正的配置加载:

...
        } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {   
            ... 
        } else if (!strcasecmp(argv[0],"loadmodule") && argc >= 2) {
            queueLoadModule(argv[1],&argv[2],argc-2);
        } else if (!strcasecmp(argv[0],"sentinel")) {
...

回到main方法的流程,Redis 会开始打印启动的日志,执行initServer()方法,服务根据配置项,继续server对象初始化内容,例如:

此后就是一些根据不同运行模式的初始化,例如常规模式运行时会记录常规日志、加载磁盘持久化的数据;而在 sentinel 模式运行时记录哨兵日志,不加载数据等。

在所有准备操作都完成后,Redis 开始陷入aeMain()的事件循环,在这个循环中会不断执行aeProcessEvents()处理发生的各种事件,直到 Redis 结束退出

两种事件

Redis 中存在有两种类型的事件:时间事件文件事件

时间事件也就是到了一定时间会发生的事件,在 Redis 中它们被记录成一个链表,每次创建新的时间事件的时候,都会在链表头部插入一个aeTimeEvent节点,其中保存了该事件会在何时发生,需要调用什么样的方法处理。遍历整个链表我们可以知道离最近要发生的时间事件还有多久,因为链表里面的节点按照自增 id 顺序排列,而在发生时间的维度上时乱序的。

文件事件可以看作 I/O 引起的事件,客户端发送命令会让服务端产生一个读 I/O,对应一个读事件;同样当客户端等待服务端消息的时候需要变得可写,让服务端写入内容,因此会对应一个写事件。AE_READABLE事件会在客户端建立连接、发送命令或其他连接变得可读的时候发生,而AE_WRITABLE事件则会在客户端连接变得可写的时候发生。

文件事件的结构简单很多,aeFileEvent记录了这是一个可读事件还是可写事件,对应的处理方法,以及用户数据。

如果同时发生了两种事件,Redis 会优先处理AE_READABLE事件。

aeProcessEvents

aeProcessEvents()方法处理已经发生和即将发生的各种事件

aeMain()循环进入aeProcessEvents()后,Redis 首先检查下一次的时间事件会在什么时候发生,在还没有时间事件发生的这段时间内,可以调用多路复用的 API aeApiPoll()阻塞并等待文件事件的发生。如果没有文件事件发生,那么超时后返回 0,否则返回已发生的文件事件数量numevents

在有文件事件可处理的情况下,Redis 会调用AE_READABLE事件的rfileProc方法以及AE_WRITABLE事件的wfileProc方法进行处理:

...
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd];
            }

            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
...

在完成前面的处理后,Redis 会继续调用processTimeEvents()处理时间事件。遍历整个时间事件链表,如果此时已经过了一段时间(阻塞等待或处理文件事件耗时),有时间事件发生,那么就调用对应时间事件的timeProc方法,将所有已经过时的时间事件处理掉:

...
        if (te->when <= now) {
            ...
            retval = te->timeProc(eventLoop, id, te->clientData);
            ...
            processed++;
            ...
        }
...

如果执行了文件事件之后还没有到最近的时间事件发生点,那么本次aeMain()循环中将没有时间事件被执行,进入下一次循环。

命令执行前后发生了什么

在客户端连接上 Redis 的时候,通过执行connSetReadHandler(conn, readQueryFromClient),设置了当读事件发生时,使用readQueryFromClient()作为读事件的 Handler 。

在收到客户端的命令请求时,Redis 进行一些检查和统计后,调用read()方法将连接中的数据读取进client.querybuf消息缓冲区中:

void readQueryFromClient(connection *conn) {
    ...
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    ...


static inline int connRead(connection *conn, void *buf, size_t buf_len) {
    return conn->type->read(conn, buf, buf_len);
}

static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
    int ret = read(conn->fd, buf, buf_len);
    ...
}

然后进入processInputBuffer(c)开始读取输入缓冲区中的消息,最后进入processCommand(c)开始处理输入的命令。

在命令执行得到结果后,首先会存放在client.buf中,并且调用调用addReply(client *c, robj *obj)方法,将这个client对象追加到server.clients_pending_write列表中。此时当次的命令,或者说AE_READABLE事件就已经基本处理完毕了,除了一些额外的统计数据、后处理以外,不会再进行发送响应消息的动作。

在当前aeProcessEvents()方法结束后,进入下一次的循环,第二次循环调用 I/O 多路复用接口等待文件事件发生前,Redis 会检查server.clients_pending_write是否有客户端需要进行回复,若有,遍历指向各个待回复客户端的server.clients_pending_write列表,逐个将客户端从中删除,并将待回复的内容通过writeToClient(c,0)回复出去

int writeToClient(client *c, int handler_installed) {
    ...
    nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
    ...

static inline int connWrite(connection *conn, const void *data, size_t data_len) {
    return conn->type->write(conn, data, data_len);
}

static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
    int ret = write(conn->fd, data, data_len);
    ...
}

Threaded I/O 模型

I/O 问题与 Threaded I/O 的引入

如果要说 Redis 会有什么性能问题,那么从 I/O 角度,由于它没有像其他 Database 一样使用磁盘,所以不存在磁盘 I/O 的问题。在数据进入缓冲区前及从缓冲区写至 Socket 时,存在一定的网络 I/O,特别是写 I/O 对性能影响比较大。以往我们会考虑做管道化来减小网络 I/O 的开销,或者将 Redis 部署成 Redis 集群来提升性能。

在 Redis 6.0 之后,由于 Threaded I/O 的引入,Redis 开始支持对网络读写的线程化,让更多的线程参与进这部分动作中,同时保持命令的单线程执行。这样的改动从某种程度上说可以既提升性能,但又避免将命令执行线程化而需要引入锁或者其他方式解决并行执行的竞态问题。

Threaded I/O 在做什么

在老版本的实现中,Redis 将不同 client 的命令执行结果保存在各自的client.buf中,然后把待回复的client存放在一个列表里,最后在事件循环中逐个将buf的内容写至对应 Socket 。对应在新版本中,Redis 使用多个线程完成这部分操作。

对读操作,Redis 同样地为server对象新增了一个clients_pending_read属性,当读事件来临时,判断是否满足线程化读的条件,如果满足,那么执行延迟读操作,将这个client对象添加到server.clients_pending_read列表中。和写操作一样,留到下一次事件循环时使用多个线程完成读操作。

Threaded I/O 的实现与限制

Init 阶段

在 Redis 启动时,如果满足对应参数配置,会进行 I/O 线程初始化的操作。

void initThreadedIO(void) {
    server.io_threads_active = 0;
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }
...

Redis 会进行一些常规检查,配置数是否符合开启多线程 I/O 的要求。

...
    for (int i = 0; i < server.io_threads_num; i++) {
        io_threads_list[i] = listCreate();
...

创建一个长度为线程数的io_threads_list列表,列表的每个元素都是另一个列表 L,L 将会用来存放对应线程待处理的多个client对象。

...
        if (i == 0) continue;
...

对于主线程,初始化操作到这里就结束了。

...
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
...

io_threads_mutex是一个互斥锁列表,io_threads_mutex[i]即第i个线程的锁,用于后续阻塞 I/O 线程操作,初始化之后将其暂时锁定。然后再对每个线程执行创建操作,tid即其指针,保存至io_threads列表中。新的线程会一直执行IOThreadMain方法,我们将它放到最后讲解。

Reads/Writes

多线程的读写主要在handleClientsWithPendingReadsUsingThreads()handleClientsWithPendingWritesUsingThreads()中完成,因为两者几乎是对称的,所以这里只对读操作进行讲解,有兴趣的同学可以检查一下写操作有什么不同的地方以及为什么。

int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;

    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
...

同样,Redis 会进行常规检查,是否启用线程化读写并且启用线程化读(只开启前者则只有写操作是线程化),以及是否有等待读取的客户端。

...
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
...

这里将server.clients_pending_read的列表转化为方便遍历的链表,然后将列表的每个节点(*client对象)以类似 Round-Robin 的方式分配个各个线程,线程执行各个 client 的读写顺序并不需要保证,命令抵达的先后顺序已经由server.clients_pending_read/write列表记录,后续也会按这个顺序执行。

...
    io_threads_op = IO_THREADS_OP_READ;
...

设置状态标记,标识当前处于多线程读的状态。由于标记的存在,Redis 的 Threaded I/O 瞬时只能处于读或写的状态,不能部分线程读,部分写。

...
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
    }
...

为每个线程记录下各自需要处理的客户端数量。当不同线程读取到自己的 pending 长度不为 0 时,就会开始进行处理。注意j从 1 开始,意味着0的主线程的 pending 长度一直为 0,因为主线程马上要在这个方法中同步完成自己的任务,不需要知道等待的任务数。

...
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
...

主线程此时将自己要处理的 client 处理完。

...
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break;
    }
    if (tio_debug) printf("I/O READ All threads finshed\n");
...

陷入循环等待,pending等于各个线程剩余任务数之和,当所有线程都没有任务的时候,本轮 I/O 处理结束。

...
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
            c->flags &= ~CLIENT_PENDING_COMMAND;
            if (processCommandAndResetClient(c) == C_ERR) {
                continue;
            }
        }
        processInputBuffer(c);
    }
...

我们已经在各自线程中将conn中的内容读取至对应 client 的client.querybuf输入缓冲区中,所以可以遍历server.clients_pending_read列表,串行地进行命令执行操作,同时将client从列表中移除。

...
    server.stat_io_reads_processed += processed;

    return processed;
}

处理完成,将处理的数量加到统计属性上,然后返回。

IOThreadMain

前面还有每个线程具体的工作内容没有解释,它们会一直陷在IOThreadMain的循环中,等待执行读写的时机。

void *IOThreadMain(void *myid) {
    long id = (unsigned long)myid;
    char thdname[16];

    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
    redis_set_thread_title(thdname);
    redisSetCpuAffinity(server.server_cpulist);
...

照常执行一些初始化内容。

...
    while(1) {
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }


        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
...

线程会检测自己的待处理的 client 列表长度,当等待队列长度大于 0 时往下执行,否则会到死循环起点。

这里利用互斥锁,让主线程有机会加锁,使得 I/O 线程卡在执行pthread_mutex_lock(),达到让 I/O 线程停止工作的效果。

...
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
...

io_threads_list[i]的客户端列表转化为方便遍历的链表,逐个遍历,借助io_threads_op标志判断当前是要执行多线程读还是多线程写,完成对自己要处理的客户端的操作。

...
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);
    }
}

清空自己要处理的客户端列表,并且将自己的待处理数量修改为 0,结束本轮操作。

Limitation

通过查看代码,使用上 Threaded I/O 的启用受以下条件影响:

性能测试

我们编译了 unstable 版本的 Redis 进行性能测试,测试工具为 Redis 自带的 redis-benchmark,统计输出的 RPS 值作为参考。

Server 实例: AWS / m5.2xlarge / 8 vCPU / 32 GB
Benchmark Client 实例: AWS / m5.2xlarge / 8 vCPU / 32 GB
Command: redis-benchmark -h 172.xx.xx.62 -p 6379 -c 100 -d 256 -t get,set -n 10000000 --threads 8

Threaded I/O off vs. Threaded I/O on

我们对比了原有的单线程 I/O 以及开启 2 线程 /4 线程的 Threaded I/O 时的表现,结果如图所示。 在开启io-threads-do-reads选项的情况下,Threaded I/O 作用于读操作,也能让性能有进一步提升,但是没有将写 I/O 线程化提升明显。另外我们还尝试使用了大体积 Payload (-d 8192)进行测试,得出结果的提升百分比并没有太大差异。

Threaded I/O vs. Redis Cluster

以往开发者会通过在单台实例上部署 Redis Cluster 来尝试让 Redis 使用上更多的 CPU 资源,我们也尝试对比了一下这种情景下的表现。 在新版本中,redis-benchmark 也得到了更新,开始支持对 Redis Cluster 的测试,通过开启--cluster参数即可检测集群模式和配置。我们在这一组对比测试中看到单实例构建集群的强大性能,在实际测试中,3 个进程的 CPU 使用率均在 80%-90%,说明仍有提升的空间。当改用测试参数-c 512时,集群能够跑出超过 40 万 RPS 的成绩。尽管测试与实际使用会有所区别,并且我们在构建集群的时候选择了不附带 Slave,但是仍然能看出来在几种模型中,构建 Cluster 能真正使用上多线程进行网络 I/O 、命令执行,对性能的提升也是最大的。

总结与思考

Redis 6.0 引入的 Threaded I/O,将 Socket 读写延迟和线程化,在网络 I/O 的方向上给 Redis 带来了一定的性能提升,并且使用门槛比较低,用户无需做太多的变更,即可在不影响业务的情况下白嫖空闲的线程资源。

另一方面,从测试结果上看,这部分的提升可能还难以让处于 Redis 5 甚至 Redis 3 版本的用户有足够的动力进行升级,特别是考虑到很多业务场景中 Redis 的性能并没有差到成为瓶颈,而且新版本的福利也未经过大规模验证,势必会影响到企业级应用中更多用户关注的服务稳定性。同时,TIO 的提升对比集群性能似乎还有一定的差距,这可能更加会让原本就处于集群架构的企业用户忽略这个功能。

但无论如何,用户肯定乐于见到更多的新功能、更多优化提升出现在 Redis 上。在保持一贯稳定性的前提下,本次的版本可以说是 Redis 从诞生至今最大的更新,不只有 Threaded I/O,包括 RESP3 、ACLs 和 SSL,我们期待这些新 Feature 能够在更多的应用场景下得到推广、验证和使用,也希望未来的版本能够给用户带来更多的惊喜和更好的体验。

Further Reading: Understanding Redis

作为一位从来没有使用过 C/类 C 语言的开发者,Redis 简洁的代码和详尽的注释为我阅读和理解其实现提供了极大的帮助。在文末我想要分享一下自己学习 Reids 的一些途径、工具和方法。

README.md应该是我们了解 Redis 的入口,而不是全局搜索main()方法。请关注Redis internals小节下的内容,这里介绍了 Redis 的代码结构,Redis 每个文件都是一个“general idea”,其中server.cnetwork.c的部分逻辑和代码在本文已经介绍过了,持久化相关的aof.crdb.c、数据库相关的db.c、Redis 对象相关的object.c、复制相关的replication.c等都值得留意。其他包括 Redis 的命令是以什么样的形式编码的,也能在README.md中找到答案,这样可以方便我们进一步阅读代码时快速定位。

Documentation 主页redis-doc repo是 Redis 文档的集合处,请注意后者的topics目录下有非常多有趣的主题,我对“有趣”的定义是像这样的文章:

作为开发者,在深入学习的阶段,这些内容能让大家从“使用”变为“了解”,然后发现 Redis 原来能做更多的事情。所以如果缺乏时间阅读和调试源码,将topics下的 60 多篇文档看一遍,大概是了解 Redis 最快的方法。

最后,如果你能看到这里,大概也会对 Redis 的源码有那么一点兴趣。因为本身并不了解 C 语言,所以我大概率会选择借助一个 IDE,在main()打上断点,然后流程的起点开始看,实际上我也确实是这么做的。另外几个代码的关键点,其实也在本文中出现过:

如果像本文一样想了解 Network 的内容,可以在aeMain()处打断点,然后关注中network.c中的方法;如果想关注具体命令相关的内容,可以在processInputBuffer()处打断点,然后关注$command.c或者类似文件中的方法,README.md文件里也已经介绍过命令方法的命名格式,定位非常容易。其余经常出现的其他动作,例如持久化、复制等,大概会出现在命令执行的前后,或者时间事件内,也可能在beforeSleep()中。server.h中定义的redisServerclient是 Redis 中两个非常重要的结构,在业务上很多内容都是转化为对它们的属性的相关操作,要特别留意。

除此以外,Antirez 曾经在Youtube上发布过一些开发的录播视频,RedisLab则有一些相对冷门使用场景的实践介绍,这些会比上面的其他学习来得更轻松些,最大的难处可能就是听懂演讲者们的口音,特别是 Antirez 本人,万幸 Youtube 的字幕功能非常强大,能解决不少麻烦。

7177 次点击
所在节点    Redis
14 条回复
lidlesseye11
2020-10-02 13:37:03 +08:00
拖到最后竟然没有公众号推广!爱了爱了
RedisMasterNode
2020-10-02 13:48:40 +08:00
@lidlesseye11 hhhh 开头有邮箱了 欢迎交流安利
ooToo
2020-10-02 15:38:32 +08:00
在达实支持一个
ppolanwind
2020-10-02 16:14:36 +08:00
想问一下演示文稿是怎么制作的,看起来很清爽,颜值很高
RedisMasterNode
2020-10-02 16:32:00 +08:00
@ppolanwind 如果你说的是图片的话 用 sketch 画的
mightofcode
2020-10-02 16:43:56 +08:00
太长了 有大佬总结下么
RedisMasterNode
2020-10-02 16:49:42 +08:00
@mightofcode 哈哈,那

Redis 的 ieda 是把接收到的事件都延迟进行处理,比如积累了一堆读事件(用户发命令过来),延迟之后用多线程读取,然后单线程顺序执行;写也是一样,把很多个已经准备好回复内容的连接累积起来,晚一点再用多线程写给各个 client
ppolanwind
2020-10-02 16:50:07 +08:00
@RedisMasterNode 感谢分享
putaozhenhaochi
2020-10-02 17:09:56 +08:00
大佬🐮
E1n
2020-10-02 23:38:42 +08:00
厉害👍
qq960826
2020-10-03 00:05:04 +08:00
卧槽居然还能碰到我们公司的大佬
zhengyouxiang
2020-10-09 11:11:17 +08:00
想知道 sketch 怎么画出这么好看的图片
RedisMasterNode
2020-10-09 15:25:37 +08:00
@zhengyouxiang 就是方框方框加方框呀 其实仔细看图里面也没什么高大上的元素在 就方框文字方框文字箭头
YouLMAO
2021-02-11 00:36:18 +08:00
Disk network io 为啥不用 epoll,这有啥用?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/712205

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX