Redis 6.0之后的多线程实现源码分析
本文主要是介绍redis多线程部分的理解,很久以前看过单线程版本的实现,最近看了几篇文章介绍多线程的原理。有些文章只是讲了些理由,看着有点模糊,对多线程为什么能提升效率,以及准确的执行点还是有点模糊。本文主要从源码阅读的角度,来梳理下其调用链,以及根据代码实现来分析多线程的实现原理和背后的设计。
多线程批量处理源码分析
读请求到来,会插入到 clients_pending_read 等到下一次主循环 beforeSleep 集中处理 pending_read
beforeSleep()
is called every time the event loop fired, Redis served a few requests, and is returning back into the event loop.
main -> aeMain -> aeProcessEvents (while !stop)
移除掉非核心的执行逻辑, aeProcessEvents 实现是这样的
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
eventLoop->beforesleep(eventLoop);
numevents = aeApiPoll(eventLoop, tvp);
eventLoop->aftersleep(eventLoop);
for (j =0;j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, ....);
}
if (fe->mask & AE_READABLE) {
fe->wfileProc(eventLoop, fd, ....);
}
}
processTimeEvents(eventLoop);
....
}
总结下这里的逻辑:本次执行的beforesleep 实质上上次全部的,读写事件产生的 pending_read/pending_write
beforesleep 缩减版代码
void beforeSleep(struct aeEventLoop *eventLoop) {
//..........省略了和梳理多线程原理相关的代码
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
//.........................................
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
}
beforeSleep 整个函数是在主线程的eventLoop中执行的, 设计到多线程改造的逻辑
- 使用io thread 处理 pending read
- 使用io thread 处理 pending write
handleClientsWithPendingReadsUsingThreads
主要流程:
- 从 clients_pending_read 列表中取元素,依次分给io_threads (从0开始 item_id++ % io_threads_num)
- 设置io thread接下来的操作类型 io_threads_op = IO_THREADS_OP_READ;
- 主线程从 io_threads_list[0] 获取任务从客户端读取数据,和io线程承担对等的读取任务
- 等待所有的io线程读取任务完成, IOPendingCount 都为0
- 设置io thread操作类型 io_threads_op = IO_THREADS_OP_IDLE;
- 从clients_pending_read 遍历client,解析已经读取好的命令执行,如果有reply,执行 putClientInPendingWriteQueue 投递到 pending_write 队列等待发送给客户端
这里关注下 io_threads_op 当这个值为idle时,会投递到pending列表,不为idle时执行实际的网络io.
handleClientsWithPendingWritesUsingThreads
执行逻辑和 read类似,这里就省略掉了
接下来主要关注 rfileProc / wfileProc 读写回调在多线程情况下的处理
clients_pending_read
通过分析代码调用链,可以得出 pending_read的来源为如下的调用链
readQueryFromClient -> postponeClientRead -> listAddNodeHead(server.clients_pending_read, c);
readQueryFromClient 调用流程梳理
- tcp accept时设定了handler acceptTcpHandler
createSocketAcceptHandler(&server.ipfd, acceptTcpHandler);
- acceptCommonHandler 处理新连接时,执行 createClient
- createClient 创建连接时,设定了读回调 readQueryFromClient
connSetReadHandler(conn, readQueryFromClient);
readQueryFromClient 实现
继续去除非主干代码,其简略版本实现为:
void readQueryFromClient(connection *conn) {
// 如果开启了thread I/O 会执行到这里
if (postponeClientRead(c)) return;
// 后面都是原来的单线程下处理的逻辑,省略
}
可以看到多线程下的改变就是调用了 postponeClientRead,其作用就是,把当前需要读取数据的client对象投递到链表 clients_pending_read ,等待分发到IO线程中进行实际的数据读取,在主EventLoop中不在处理实际的client的读取。
这里是多线程设计的核心,如果读事件到来,扔到pending_read中,实现的类似delay的效果,这样可以集中几个client的读一次统一处理。
再次补充下 io_threads_op 这个重要的全局变量对读写逻辑的影响
在任务分发给了不同的io thread之后,主线程会设置
io_threads_op = IO_THREADS_OP_READ;
这里是一个全局变量,这样会执行 readQueryFromClient 后面实际数据读取,而不是再投递到pending列表中。写逻辑也类似。
clients_pending_write
handleClientsWithPendingReadsUsingThreads ->
putClientInPendingWriteQueue -> listAddNodeHead(server.clients_pending_write,c);
handleClientsWithPendingReadsUsingThreads 主线程会在读线程数据全部读取完后,在线程中执行命令,再将执行完的命令投递到 pending_write列表中
pending_read/write 不加锁的设计
IOThreadMain listNext 从 io_threads_list[id] 这个列表取数据,只读不删 主线程 listAddNodeTail 给 io_threads_list[id] 增加数据 主线程 listDelNode 给 io_threads_list[id] 删除数据
如果只从这个改写数据的模式,单独单写来讲,不需要加锁。从数据交互上讲,写完数据,只有消费完了才能移除掉,这里需要一种合适的通讯方式,能保证删除前,数据被确定的消费了。数据的增删都由一个线程维护,数据是安全的。主要是读取逻辑如果位于增删之间,结果是不确定性的。一旦有了同步机制,能保证顺序的执行,就可以不加锁了。按照这个思想再回到redis对 pending list的处理
// handleClientsWithPendingReadsUsingThreads
int handleClientsWithPendingReadsUsingThreads(void) {
// 1. 列表增加数据添加
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 2. 列表数据添加完之后,集中设定 setIOPendingCount
// 这里其实相当于一个通知信号,数据准备就绪,可以消费了
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
//3. 等待 getIOPendingCount 信号被设定为0
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
//4. 列表数据删除
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
listDelNode(server.clients_pending_read,ln);
c->pending_read_list_node = NULL;
}
return processed;
}
这里的IOPendingCount是一个原子操作保证线程安全 atomicSetWithSync(io_threads_pending[i], count);
补充一点,基于对原子操作的理解,上面的实现还有一个问题,就是指令重排的问题,虽然保证了 IOPendingCount的原子性,但无法保证1,2,3,4是按顺序执行的,指令重排有可能打乱这种顺序。抱着这样的想法,在列一下,改原子操作对应的内存屏障相关的代码
#define atomicGetWithSync(var,dstvar) do { \
dstvar = atomic_load_explicit(&var,memory_order_seq_cst); \
} while(0)
#define atomicGetWithSync(var,dstvar) do { \
dstvar = __atomic_load_n(&var,__ATOMIC_SEQ_CST); \
} while(0)
// 内建的同步,采用的是全内存屏障
#define atomicGetWithSync(var,dstvar) { \
dstvar = __sync_sub_and_fetch(&var,0,__sync_synchronize); \
ANNOTATE_HAPPENS_AFTER(&var); \
} while(0)
文档上也清晰的标注了,使用了线程间的同步
atomicGetWithSync(var,value) – ‘atomicGet’ with inter-thread synchronization atomicSetWithSync(var,value) – ‘atomicSet’ with inter-thread synchronization
IOThreadMain 实现补充
IOThreadMain 源码解析
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
// 设置线程名称,方便调试和定位问题
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
// 设置cpu亲和度
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
// 每轮检查下 IOPendingCount 如果为0,检查下锁,保证主线程可以通过锁block住io线程
/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
// 创建线程前initThreadedIO中 io_threads_mutex 被lock住
// 导致这里阻塞住,直到startThreadedIO执行unlock
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
// 走到这里来 iopendingcount 不应该为0,属于服务器自己的逻辑断言
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
// io线程当前全局的操作类型为写数据
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// io线程当前全局的操作类型为读数据
readQueryFromClient(c->conn);
} else {
// 走到这里表示主线程没有合理的设定io线程的操作类型,但是分配了任务,io线程无法判断要做什么
serverPanic("io_threads_op value is unknown");
}
}
// 列表已经访问完了,清空
listEmpty(io_threads_list[id]);
// 设定标记,通知主线程任务完成
setIOPendingCount(id, 0);
}
}
总结
redis多线程,解决的是将一次eventloop执行的读写操作并行的放入io线程执行,实际上的命令执行还是在主线程中单线程执行。这样将多个命令的sum(IO1 + IO2 + IO3) 缩减为 max(IO1,IO2,IO3). 通过原子操作以及内存屏障控制主线程和io线程的同步实现无锁控制。锁本身弱化为,提供主线程block的机智,而不是用于数据交互或者同步。