https://blog.2014bduck.com/archives/326
上周参加了字节跳动的面试,也是 18 年毕业后的首次面试,整场下来一共 70 分钟,面试官非常 Nice,无奈自己太过紧张,很多准备好的知识点都没有能够准确传达意思,所以错失了这次的机会。
面试中因为在简历上有提到 Redis 相关的内容,那么毫无疑问就会被问到了。先从经典的问题开始:Reids 为什么这么快?那自然会回答诸如单线程、IO 多路复用等固定套路,然后这里因为一直有关注 Redis 的相关新闻,知道 Redis 6.0 年末发布了 RC1 版本,其中新特性包括多线程 IO,那么自然想在面试中提及一下。面试官应该对这点比较感兴趣,于是就继续探讨了这个多线程 IO 的模型。
这里先总结一下:
于是坑坑洼哇地坚持完了 70 分钟的面试,再总结一下做得不足的地方,因为是 1.5Year 经验,面试官主要考察:
所以就这样结束了第一次的社招面试,整体来说几个方向的基础知识需要回去再多写多看就可以了,然后表达上尽量控制时间和范围,深入的内容如果面试官希望和你继续探讨,自然会发问,如果没问,可以提及但是不应该直接展开讲。
面试结束后马上知道这块的回答有问题,检查果然如此。所以也就借这个机会将 Threaded IO 对应的源码看了一遍,后续如果有机会的话,希望能跟下一位面试官再来探讨这个模型。
本次新增的代码位于networking.c
中,很显然多线程生效的位置就能猜出来是在网络请求上。作者希望改进读写缓冲区的性能,而不是命令执行的性能主要原因是:
那么将读写缓冲区改为多线程后整个模型大致如下:
首先,如果用户没有开启多线程 IO,也就是io_threads_num == 1
时直接按照单线程模型处理;如果超过线程数IO_THREADS_MAX_NUM
上限则异常退出。
紧接着 Redis 使用 listCreate()创建 io_threads_num 个线程,并且对主线程( id=0 )以外的线程进行处理:
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
// 如果用户没有开启多线程 IO 直接返回 使用主线程处理
if (server.io_threads_num == 1) return;
// 线程数设置超过上限
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
// 初始化 io_threads_num 个对应线程
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; // Index 0 为主线程
/* Things we do only for the additional threads. */
// 非主线程则需要以下处理
pthread_t tid;
// 为线程初始化对应的锁
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 线程等待状态初始化为 0
io_threads_pending[i] = 0;
// 初始化后将线程暂时锁住
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
// 将 index 和对应线程 ID 加以映射
io_threads[i] = tid;
}
}
Redis 需要判断是否满足 Threaded IO 条件,执行if (postponeClientRead(c)) return;
,执行后会将 Client 放到等待读取的队列中,并将 Client 的等待读取 Flag 置位:
int postponeClientRead(client *c) {
if (io_threads_active && // 线程是否在不断(spining)等待 IO
server.io_threads_do_reads && // 是否多线程 IO 读取
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{//client 不能是主从,且未处于等待读取的状态
c->flags |= CLIENT_PENDING_READ; // 将 Client 设置为等待读取的状态 Flag
listAddNodeHead(server.clients_pending_read,c); // 将这个 Client 加入到等待读取队列
return 1;
} else {
return 0;
}
}
这时 server 维护了一个clients_pending_read
,包含所有处于读事件 pending 的客户端列表。
首先,Redis 检查有多少等待读的 client:
listLength(server.clients_pending_read)
如果长度不为 0,进行 While 循环,将每个等待的 client 分配给线程,当等待长度超过线程数时,每个线程分配到的 client 可能会超过 1 个:
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
并且修改每个线程需要完成的数量(初始化时为 0 ):
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
等待处理直到没有剩余任务:
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
最后清空 client_pending_read:
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
processCommandAndResetClient(c);
}
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
在上面的过程中,当任务分发完毕后,每个线程按照正常流程将自己负责的 Client 的读取缓冲区的内容进行处理,和原来的单线程没有太大差异。
每轮处理中,需要将各个线程的锁开启,并且将相关标志置位:
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
// 解开线程的锁定状态
pthread_mutex_unlock(&io_threads_mutex[j]);
// 现在可以开始多线程 IO 执行对应读 /写任务
io_threads_active = 1;
}
同样结束时,首先需要检查是否有剩余待读的 IO,如果没有,将线程锁定,标志关闭:
void stopThreadedIO(void) {
// 需要停止的时候可能还有等待读的 Client 在停止前进行处理
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
// 本轮 IO 结束 将所有线程上锁
pthread_mutex_lock(&io_threads_mutex[j]);
// IO 状态设置为关闭
io_threads_active = 0;
}
Redis 的 Threaded IO 模型中,每次所有的线程都只能进行读或者写操作,通过io_threads_op
控制,同时每个线程中负责的 client 依次执行:
// 每个 thread 有可能需要负责多个 client
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
// 当前全局处于写事件时,向输出缓冲区写入响应内容
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 当前全局处于读事件时,从输入缓冲区读取请求内容
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
每个线程执行readQueryFromClient
,将对应的请求放入一个队列中,单线程执行,最后类似地由多线程将结果写入客户端的 buffer 中。
Threaded IO 将服务读 Client 的输入缓冲区和将执行结果写入输出缓冲区的过程改为了多线程的模型,同时保持同一时间全部线程均处于读或者写的状态。但是命令的具体执行仍是以单线程(队列)的形式,因为 Redis 希望保持简单的结构避免处理锁和竞争的问题,并且读写缓冲区的时间占命令执行生命周期的比重较大,处理这部分的 IO 模型会给性能带来显著的提升。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.