0x00 在内核实现初步的线程机制
0x01 什么是线程 线程就是程序中的一条执行流 ,也就是代码从上到下运行的一条路径。在一个程序里,可以同时存在多条执行流,每一条执行流就是一个线程,它们可以并发地执行不同的代码。
为了支持并发执行,每个线程都有独立的寄存器上下文和栈,用于维护自身的执行状态;但线程之间共享进程的地址空间,包括堆、全局变量和代码段等资源。这也是线程与进程的重要区别之一。
进程?什么是进程:进程是资源分配的单位,包含一个或多个线程,也就是说进程 = 资源 + 线程。
线程是 cpu 调度的单位,进程是资源分配的单位并包含了线程。
0x02 线程调度机制 0x01 如何控制线程 用一块内存来存储线程的执行信息,OS 才认识“线程”。这块内存称为程序控制块:对于进程用 PCB (包括资源和执行信息),对于线程用 TCB (只有执行信息)。如下图:
我们实现的时候为了方便统一用一个类似 PCB 的 task_struct 来作为程序控制块,给线程用的时候不分配资源即可。
既然要调度线程,也就是调度执行流,肯定要自己控制栈:毕竟栈天然地和执行流绑定。自己控制栈最方便的方法就是自己定义一个结构体在一块内存中,要使用的时候令 esp 指向它即可。这样便可以完全控制一个栈帧,也就完全控制了一个执行流。(毕竟 call ret 等指令都和栈息息相关)。
为什么说完全控制?当我们想执行栈顶某个地址的函数时,直接执行 ret 即可,ret 翻译一下就是 pop eip。同时因为整个栈帧的内容都由我们自己控制,参数和返回地址自然也随便控制了。
以下是相关的结构定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 enum _task_status { TASK_RUNNING, TASK_READY, TASK_BLOCKED, TASK_WAITING, TASK_HANGING, TASK_DIED }; struct _intr_stack { uint32_t vec; uint32_t edi; uint32_t esi; uint32_t ebp; uint32_t esp_dummy; uint32_t ebx; uint32_t edx; uint32_t ecx; uint32_t eax; uint32_t gs; uint32_t fs; uint32_t es; uint32_t ds; uint32_t err_code; void (*eip) (void ); uint32_t cs; uint32_t eflags; void * esp; uint32_t ss; }; struct _thread_stack { uint32_t ebx; uint32_t edi; uint32_t esi; uint32_t ebp; void (*eip)(thread_func* func, void * func_arg); void (*unused_retaddr); thread_func* function; void * func_arg; }; struct _task_struct { uint32_t * self_kstack; enum _task_status status ; char name[16 ]; uint8_t priority; uint8_t ticks; uint32_t elapsed_ticks; struct list_elem general_tag ; struct list_elem all_list_tag ; uint32_t * pgdir; uint32_t stack_magic; };
在创建一个线程时,先申请一页内存放一个_task_struct,然后再初始化好其中的相关内容即可。
具体如何调度线程,也就是如何切换到某个线程来执行呢?
如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [bits 32] section .text global switch_task switch_task: ;栈中此处是返回地址 ; 注意这里入栈要和 _thread_stack 中顺序一致 push ebp mov ebp, esp push esi push edi push ebx mov eax, [ebp + 8] ;得到栈中的参数cur mov [eax], esp ;保存栈顶指针esp,task_struct的self_kstack字段 ;self_kstack在task_struct中的偏移为0 ;所以直接往thread开头处存4字节即可 ; =========== 以上是备份当前线程的环境,下面是恢复下一个线程的环境 mov eax, [ebp + 12] ;栈中的参数next mov esp, [eax] ;恢复下一个线程的栈顶 pop ebx ; 从next线程栈中恢复其寄存器, 下同 pop edi pop esi pop ebp ret ;返回到next线程上次压入的返回地址 ;如果此时的 next 线程之前尚未执行过,第一次执行,此时栈顶的值是初始化时设置的 kernel_thread 函数来调用指定定函数
这个汇编实现了一个函数 switch_task(cur, next);,从线程 cur 切换到线程 next。这里使用的栈就是我们自己定义的_thread_stack 结构体。cur 或者 next 指针指向的就是_task_struct,其第一个成员就是线程栈顶指针,这个函数先保存好寄存器和cur的栈顶指针,然后把栈切换到next的线程栈,从中恢复寄存器,然后调用ret,此时
1 2 3 4 5 6 7 8 ebx edi esi ebp eip ← ret unused_retaddr ← 返回地址 function ← 参数1 func_arg ← 参数2
这样就跳转到了我们在next的线程栈中布置好的需要执行的函数地址。我们专门设置一个函数如下
1 2 3 4 5 6 7 static void thread_start (thread_func* function, void * func_arg) { _enable_intr(); function(func_arg); }
他来执行不同线程特定的function。也就是说初始化一个线程的时候在把的线程栈的eip成员赋值为这个函数。
0x02 调度线程的策略 使用双向链表来实现一个任务队列,来调度线程。
注意这里使用“侵入式”双向链表。
即链表结点只有两个指针成员,没有数据。通过把结点放入目标结构中作为其一部分实现连接。如下图
这里任务队列可以实现一个优先队列,决定调度执行哪个线程。我们姑且使用最简单的队列来调度,就按顺序一个一个来。
0x03 调度线程的过程 操作系统通过时钟中断来调度线程。
假设有两个线程,cur_thread 和 next_thread ,发生时钟中断,从前者调度到后者,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static void _timer_intr_handler(void ){ struct _task_struct * cur_thread = running_thread(); ASSERT(cur_thread->stack_magic == 0xdeadbeef ); cur_thread->elapsed_ticks++; ticks++; if (cur_thread->ticks == 0 ) { task_schedule(); }else { cur_thread->ticks--; } } ... void task_schedule (void ) { ASSERT(_get_intr_status() == INTR_OFF); struct _task_struct * cur = running_thread(); if (cur->status == TASK_RUNNING) { ASSERT(!elem_find(&thread_ready_list, &cur->general_tag)); list_append(&thread_ready_list, &cur->general_tag); cur->ticks = cur->priority; cur->status = TASK_READY; }else { } ASSERT(!list_empty(&thread_ready_list)); thread_tag = NULL ; thread_tag = list_pop(&thread_ready_list); struct _task_struct * next = elem2entry(struct _task_struct, general_tag, thread_tag); next->status = TASK_RUNNING; switch_task(cur, next); }
复习一下我们实现的中断机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 global _intr_entry_table _intr_entry_table: %macro VECTOR 2 ;这里定义一个多行宏作为 intr_handler,后面的2是指传递两个参数,里面的%1,%2代表参数 section .text intr%1entry: ; ============= 默认压入错误码或者空操作,保证最后都会需要跨过一个4字节来进行iret %2 ; ============= 保存上下文 push ds push es push fs push gs pushad ;压入通用寄存器 ; ============= 发送中断结束信号 mov al, 0x20 ; 中断结束命令EOI,这里R为0,SL为0,EOI为1 mov dx, 0xa0 ; 中断控制器从片端口 out dx, al ; 向从片发送 mov dx, 0x20 ; 中断控制器主片端口 out dx, al ; 向主片发送 ; ============= 压入中断向量号并调用中断处理函数 push %1 call [IntrHandlerTable + %1*4] jmp intr_exit ; ============= intr_entry_table数组(编译后会将相同的section合并成一个段) section .data dd intr%1entry ;存储各个中断入口程序的地址, %endmacro ;多行宏结束标志
会压入大量的上下文和一些中断信息,再跳转到这里之前还会压入eip、cs、eflags、esp和ss寄存器。这就是我们此前定义的_intr_stack发挥用处的地方了,这里中断处理程序压入的上下文正好填充在我们定义的_intr_stack。看如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void thread_init_stack (struct _task_struct* pthread, thread_func function, void * func_arg) { pthread->self_kstack = (uint32_t *)((uint8_t *)pthread->self_kstack - sizeof (struct _intr_stack)); pthread->self_kstack = (uint32_t *)((uint8_t *)pthread->self_kstack - sizeof (struct _thread_stack)); struct _thread_stack * kthread_stack = (struct _thread_stack*)pthread->self_kstack; kthread_stack->eip = kernel_thread; kthread_stack->function = function; kthread_stack->func_arg = func_arg; kthread_stack->ebp = kthread_stack->ebx = kthread_stack->esi = kthread_stack->edi = 0 ; }
初始化一个线程栈的时候预留了中断栈的空间。
中断栈填满后就到了_thread_stack也就是线程栈
执行 switch_task(cur, next);,翻译成汇编就是
1 2 3 push next_thread push cur_thread call switch_task
再展开一点就是
1 2 3 4 push next_thread push cur_thread push retaddr jmp switch_task
然后switch_task开头会保存那几个寄存器,此时 cur 线程的线程栈就被保存为:
1 2 3 4 5 6 7 8 ebx -- ebx 成员 (栈顶) edi -- edi 成员 esi -- esi 成员 ebp -- ebp 成员 switch_task_retaddr -- eip 成员,是switch_task返回地址 next_thread -- unused_retaddr 成员 cur_thread -- function 成员 task_schedule_retaddr -- func_arg 成员
然后执行到mov esp, [eax]之后就切换到了next_thread的栈, 也是如下结构
1 2 3 4 5 6 7 8 ebx edi esi ebp eip ← ret unused_retaddr ← 返回地址 function ← 参数1 func_arg ← 参数2
会ret 跳转到 eip 成员。此时eip成员的值有两种情况:
我们就认为是第二种情况的话,其栈帧就会和
1 2 3 4 5 6 7 8 ebx -- ebx 成员 (栈顶) edi -- edi 成员 esi -- esi 成员 ebp -- ebp 成员 switch_task_retaddr -- eip 成员,是switch_task返回地址 next_thread -- unused_retaddr 成员 cur_thread -- function 成员 task_schedule_retaddr -- func_arg 成员
一致,那么此时会跳转到switch_task返回地址返回到task_schedule的结束部分,然后跳转到task_schedule返回地址,也就返回到_timer_intr_handler时钟中断处理程序的结束部分,最后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 call [IntrHandlerTable + %1*4] jmp intr_exit ; ============= intr_entry_table数组(编译后会将相同的section合并成一个段) section .data dd intr%1entry ;存储各个中断入口程序的地址, %endmacro ;多行宏结束标志 section .text global intr_exit intr_exit: ; =========== 恢复上下文 add esp, 4 ; 跳过中断号 popad pop gs pop fs pop es pop ds add esp, 4 ; 跳过error_code ; =========== 返回 iretd
如上所示从中断中返回,也就是说上次被时钟中断调度的时候执行到哪,这次就回到那里继续执行 !非常完美。
我们执行一下看看呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include "kernel/print.h" #include "init.h" #include "debug.h" #include "interrupt.h" #include "memory.h" #include "thread.h" void tA (void *) ;void tB (void *) ;int main (void ) { print("kernel made by r3t2\n" ); init(); char * strA = "TA" ; char * strB = "TB" ; thread_create("testA" , 31 , tA, strA); thread_create("testB" , 8 , tB, strB); _enable_intr(); while (1 ) { print("main thread" ); } return 0 ; } void tA (void * arg) { char * str = arg; print(str); } void tB (void * arg) { char * str = arg; print(str); }
如上所示,使用 qemu 执行一下
缺页中断!(我优化了一下默认中断处理程序会输出中断类型,暂时还没有实现缺页中断的处理)
为什么?试想一下,如果一个线程被调度执行,执行完毕之后呢?
也就是说kernel_thread(thread_func* function, void* func_arg)这个函数执行完成了,需要返回,此时栈上的返回地址是:unused_retaddr。会返回到未设置的地方!这里的缺页地址正是一个典型的未初始化的垃圾值。所以说还需要一个给执行完毕的线程“处理后事”的函数,来让线程正确的结束退出。(线程退出需要回收其占用的内存等等资源,我们的内存管理还未完全完善,这一点留待后续再做)
所以暂时在kernel_thread(thread_func* function, void* func_arg)返回前加一个死循环,后面实现一个线程退出函数再去掉即可。或者把几个线程执行的函数后面加上死循环。再测试就正常了。
我们好像还少考虑了什么…线程安全!
实现线程怎么能不考虑线程安全呢?按照参考书说法,这里再继续执行一会就会触发 #GP General Protection Exception 这个中断,因为我们实现的打印函数不是线程安全的:对于显存、硬件的光标寄存器和全局的打印内容缓冲区都没有保护。
为了保证打印操作的原子性,在打印前关中断避免其他线程的调度,打印后开中断恢复,就可以了。但这只是权宜之计。
0x03 同步机制:锁 怎么保证对于共享资源并发访问时候的线程安全呢?
可以把共享资源看作一个房间:很多人(线程)都想进入,但必须保证同一时间的访问是有序的。每个人(线程)进入时,看到的状态是完整一致的(读不被干扰),在里面进行的修改也是独占的(写不被打断),从而避免相互影响。
那么:每个人进去前给房间上一把锁,如果已经有锁了那么就得等里面的人出来把锁打开,然后自己再上锁进去,在里面修改完别人才能看到房间里的状态或者进行其他修改。
那么这个机制如何实现呢?其实和类比稍微有点区别,不是“我去上锁”,而是“我去拿锁”。
基于 Semaphore(信号量) 的锁,本质就是用一个“计数器”来控制。初始时信号量设为1,当线程尝试获取锁时,就检查信号量并尝试减一,如果信号量大于等于0,说明可以获取到锁,那么线程可以访问资源。如果信号量小于0,说明锁被其他线程获取了,此线程进入阻塞等待其他线程释放锁才能访问资源。
如下实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 struct semaphore { uint8_t value; struct list waiters ; }; struct lock { struct _task_struct * holder ; struct semaphore semaphore ; uint32_t holder_repeat_nr; }; void sema_init (struct semaphore* psema, uint8_t value) { psema->value = value; list_init(&psema->waiters); } void lock_init (struct lock* plock) { plock->holder = NULL ; plock->holder_repeat_nr = 0 ; sema_init(&plock->semaphore, 1 ); } void sema_down (struct semaphore* psema) { _intr_status old_status = _disable_intr(); while (psema->value == 0 ) { ASSERT(!elem_find(&psema->waiters, &running_thread()->general_tag)); if (elem_find(&psema->waiters, &running_thread()->general_tag)) { PANIC("sema_down: thread blocked has been in waiters_list\n" ); } list_append(&psema->waiters, &running_thread()->general_tag); thread_block(TASK_BLOCKED); } psema->value--; ASSERT(psema->value == 0 ); _set_intr_status(old_status); } void sema_up (struct semaphore* psema) { _intr_status old_status = _disable_intr(); ASSERT(psema->value == 0 ); if (!list_empty(&psema->waiters)) { struct _task_struct * thread_blocked = elem2entry(struct _task_struct, general_tag, list_pop(&psema->waiters)); thread_unblock(thread_blocked); } psema->value++; ASSERT(psema->value == 1 ); _set_intr_status(old_status); } void lock_acquire (struct lock* plock) { if (plock->holder != running_thread()) { sema_down(&plock->semaphore); plock->holder = running_thread(); ASSERT(plock->holder_repeat_nr == 0 ); plock->holder_repeat_nr = 1 ; }else { plock->holder_repeat_nr++; } } void lock_release (struct lock* plock) { ASSERT(plock->holder == running_thread()); if (plock->holder_repeat_nr > 1 ) { plock->holder_repeat_nr--; return ; } ASSERT(plock->holder_repeat_nr == 1 ); plock->holder = NULL ; plock->holder_repeat_nr = 0 ; sema_up(&plock->semaphore); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void thread_block (enum _task_status stat) { ASSERT(((stat == TASK_BLOCKED) || (stat == TASK_WAITING) || (stat == TASK_HANGING))); _intr_status old_status = _disable_intr(); struct _task_struct * cur_thread = running_thread(); cur_thread->status = stat; task_schedule(); _set_intr_status(old_status); } void thread_unblock (struct _task_struct* pthread) { _intr_status old_status = _disable_intr(); ASSERT(((pthread->status == TASK_BLOCKED) || (pthread->status == TASK_WAITING) || (pthread->status == TASK_HANGING))); if (pthread->status != TASK_READY) { ASSERT(!elem_find(&thread_ready_list, &pthread->general_tag)); if (elem_find(&thread_ready_list, &pthread->general_tag)) { PANIC("thread unblock: blocked thread in ready_list\n" ); } list_push(&thread_ready_list, &pthread->general_tag); pthread->status = TASK_READY; } _set_intr_status(old_status); }
注意我们这里设计了”可重入锁“:什么意思呢?就是某个线程持有锁时,其再次申请锁不会把自己锁死。设想一下某个线程持有锁,它再自己申请一次锁,就陷入了自己等待自己释放锁的死锁状态。所以我们定义一个成员来记录锁持有者获取锁的次数,当次数大于等于0时仅仅增加计数,在释放锁时候也要做对称处理即可:即申请了几次锁也要释放几次锁。
注意到的是,信号量本身的线程安全在这里是通过关闭中断实现的 ,在 单核 cpu 下这确实能保证对信号量的原子操作,如果是多核 cpu 的话,仅能保证在这个核上的安全,如果有其他核访问这个信号量仍然会产生竞态。
如果要支持多核 cpu ,就需要再加一个锁保护信号量本身。可能会想,这不就无限嵌套递归了吗?谁又能保护保护信号量的锁呢?这时候就需要原子操作 了。
回顾竞态的发生,一般都是对于共享资源的操作被其他线程打断了。那么有没有不可以打断的操作呢?
有的有的,就是原子操作 。
执行过程中不会被线程切换或中断打断
多个 CPU 同时操作同一变量时,只会有一个成功
最常见的一个原子操作就是xchg指令:交换寄存器之间或者寄存器和内存的值。这个指令 cpu 在硬件上保证了他是一个原子操作。
我们使用 xchg 指令来实现一个自旋锁 ,来保护信号量本身,就可以支持在多核 cpu 下的线程安全了,同时也不用再专门去关中断。
例如以下实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 struct spinlock { volatile _Bool value; }; struct semaphore { uint8_t value; struct list waiters ; struct spinlock selflock ; }; ... static inline void spinlock_init (struct spinlock *lock) { lock->value = 0 ; } static inline void spin_lock (struct spinlock *lock) { while (1 ) { int old; asm volatile ( "xchg %0, %1" : "=r" (old), "+m" (lock->value) : "0" (1 ) : "memory" ) ; if (old == 0 ) break ; asm volatile ("pause" ) ; } } static inline void spin_unlock (struct spinlock *lock) { asm volatile ( "movl $0, %0" : "+m" (lock->value) : : "memory" ) ;}
但是我们此前很多内容都没有去专门适配多核 cpu,姑且就继续使用关中断,仅考虑单核 cpu 吧。