0x00 在内核实现初步的线程机制
0x01 什么是线程 线程就是程序中的一条执行流 ,也就是代码从上到下运行的一条路径。在一个程序里,可以同时存在多条执行流,每一条执行流就是一个线程,它们可以并发地执行不同的代码。
为了支持并发执行,每个线程都有独立的寄存器上下文和栈,用于维护自身的执行状态;但线程之间共享进程的地址空间,包括堆、全局变量和代码段等资源。这也是线程与进程的重要区别之一。
进程?什么是进程:进程是资源分配的单位,包含一个或多个线程,也就是说进程 = 资源 + 线程。
线程是 cpu 调度的单位,进程是资源分配的单位并包含了线程。
0x02 线程调度机制 0x01 如何控制线程 用一块内存来存储线程的执行信息,OS 才认识“线程”。这块内存称为程序控制块:对于进程用 PCB (包括资源和执行信息),对于线程用 TCB (只有执行信息)。如下图:
我们实现的时候为了方便统一用一个类似 PCB 的 task_struct 来作为程序控制块,给线程用的时候不分配资源即可。
既然要调度线程,也就是调度执行流,肯定要自己控制栈:毕竟栈天然地和执行流绑定。自己控制栈最方便的方法就是自己定义一个结构体在一块内存中,要使用的时候令 esp 指向它即可。这样便可以完全控制一个栈帧,也就完全控制了一个执行流。(毕竟 call ret 等指令都和栈息息相关)。
为什么说完全控制?当我们想执行栈顶某个地址的函数时,直接执行 ret 即可,ret 翻译一下就是 pop eip。同时因为整个栈帧的内容都由我们自己控制,参数和返回地址自然也随便控制了。
以下是相关的结构定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 enum _task_status { TASK_RUNNING, TASK_READY, TASK_BLOCKED, TASK_WAITING, TASK_HANGING, TASK_DIED }; struct _intr_stack { uint32_t vec; uint32_t edi; uint32_t esi; uint32_t ebp; uint32_t esp_dummy; uint32_t ebx; uint32_t edx; uint32_t ecx; uint32_t eax; uint32_t gs; uint32_t fs; uint32_t es; uint32_t ds; uint32_t err_code; void (*eip) (void ); uint32_t cs; uint32_t eflags; void * esp; uint32_t ss; }; struct _thread_stack { uint32_t ebx; uint32_t edi; uint32_t esi; uint32_t ebp; void (*eip)(thread_func* func, void * func_arg); void (*unused_retaddr); thread_func* function; void * func_arg; }; struct _task_struct { uint32_t * self_kstack; enum _task_status status ; char name[16 ]; uint8_t priority; uint8_t ticks; uint32_t elapsed_ticks; struct list_elem general_tag ; struct list_elem all_list_tag ; uint32_t * pgdir; uint32_t stack_magic; };
在创建一个线程时,先申请一页内存放一个_task_struct,然后再初始化好其中的相关内容即可。
具体如何调度线程,也就是如何切换到某个线程来执行呢?
如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 [bits 32] section .text global switch_task switch_task: ;栈中此处是返回地址 ; 注意这里入栈要和 _thread_stack 中顺序一致 push ebp mov ebp, esp push esi push edi push ebx mov eax, [ebp + 8] ;得到栈中的参数cur mov [eax], esp ;保存栈顶指针esp,task_struct的self_kstack字段 ;self_kstack在task_struct中的偏移为0 ;所以直接往thread开头处存4字节即可 ; =========== 以上是备份当前线程的环境,下面是恢复下一个线程的环境 mov eax, [ebp + 12] ;栈中的参数next mov esp, [eax] ;恢复下一个线程的栈顶 pop ebx ; 从next线程栈中恢复其寄存器, 下同 pop edi pop esi pop ebp ret ;返回到next线程上次压入的返回地址 ;如果此时的 next 线程之前尚未执行过,第一次执行,此时栈顶的值是初始化时设置的 kernel_thread 函数来调用指定定函数
这个汇编实现了一个函数 switch_task(cur, next);,从线程 cur 切换到线程 next。这里使用的栈就是我们自己定义的_thread_stack 结构体。cur 或者 next 指针指向的就是_task_struct,其第一个成员就是线程栈顶指针,这个函数先保存好寄存器和cur的栈顶指针,然后把栈切换到next的线程栈,从中恢复寄存器,然后调用ret,此时
1 2 3 4 5 6 7 8 ebx edi esi ebp eip ← ret unused_retaddr ← 返回地址 function ← 参数1 func_arg ← 参数2
这样就跳转到了我们在next的线程栈中布置好的需要执行的函数地址。我们专门设置一个函数如下
1 2 3 4 5 6 7 static void kernel_thread (thread_func* function, void * func_arg) { _enable_intr(); function(func_arg); }
他来执行不同线程特定的function。也就是说初始化一个线程的时候在把的线程栈的eip成员赋值为这个函数。
0x02 调度线程的策略 使用双向链表来实现一个任务队列,来调度线程。
注意这里使用“侵入式”双向链表。
即链表结点只有两个指针成员,没有数据。通过把结点放入目标结构中作为其一部分实现连接。如下图
这里任务队列可以实现一个优先队列,决定调度执行哪个线程。我们姑且使用最简单的队列来调度,就按顺序一个一个来。
0x03 调度线程的过程 操作系统通过时钟中断来调度线程。
假设有两个线程,cur_thread 和 next_thread ,发生时钟中断,从前者调度到后者,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 static void _timer_intr_handler(void ){ struct _task_struct * cur_thread = running_thread(); ASSERT(cur_thread->stack_magic == 0xdeadbeef ); cur_thread->elapsed_ticks++; ticks++; if (cur_thread->ticks == 0 ) { task_schedule(); }else { cur_thread->ticks--; } } ... void task_schedule (void ) { ASSERT(_get_intr_status() == INTR_OFF); struct _task_struct * cur = running_thread(); if (cur->status == TASK_RUNNING) { ASSERT(!elem_find(&thread_ready_list, &cur->general_tag)); list_append(&thread_ready_list, &cur->general_tag); cur->ticks = cur->priority; cur->status = TASK_READY; }else { } ASSERT(!list_empty(&thread_ready_list)); thread_tag = NULL ; thread_tag = list_pop(&thread_ready_list); struct _task_struct * next = elem2entry(struct _task_struct, general_tag, thread_tag); next->status = TASK_RUNNING; switch_task(cur, next); }
复习一下我们实现的中断机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 global _intr_entry_table _intr_entry_table: %macro VECTOR 2 ;这里定义一个多行宏作为 intr_handler,后面的2是指传递两个参数,里面的%1,%2代表参数 section .text intr%1entry: ; ============= 默认压入错误码或者空操作,保证最后都会需要跨过一个4字节来进行iret %2 ; ============= 保存上下文 push ds push es push fs push gs pushad ;压入通用寄存器 ; ============= 发送中断结束信号 mov al, 0x20 ; 中断结束命令EOI,这里R为0,SL为0,EOI为1 mov dx, 0xa0 ; 中断控制器从片端口 out dx, al ; 向从片发送 mov dx, 0x20 ; 中断控制器主片端口 out dx, al ; 向主片发送 ; ============= 压入中断向量号并调用中断处理函数 push %1 call [IntrHandlerTable + %1*4] jmp intr_exit ; ============= intr_entry_table数组(编译后会将相同的section合并成一个段) section .data dd intr%1entry ;存储各个中断入口程序的地址, %endmacro ;多行宏结束标志
会压入大量的上下文和一些中断信息,再跳转到这里之前还会压入eip、cs、eflags、esp和ss寄存器。这就是我们此前定义的_intr_stack发挥用处的地方了,这里中断处理程序压入的东西正好填充在我们定义的_intr_stack。看如下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void thread_init_stack (struct _task_struct* pthread, thread_func function, void * func_arg) { pthread->self_kstack = (uint32_t *)((uint8_t *)pthread->self_kstack - sizeof (struct _intr_stack)); pthread->self_kstack = (uint32_t *)((uint8_t *)pthread->self_kstack - sizeof (struct _thread_stack)); struct _thread_stack * kthread_stack = (struct _thread_stack*)pthread->self_kstack; kthread_stack->eip = kernel_thread; kthread_stack->function = function; kthread_stack->func_arg = func_arg; kthread_stack->ebp = kthread_stack->ebx = kthread_stack->esi = kthread_stack->edi = 0 ; }
初始化一个线程栈的时候预留了中断栈的空间!
中断栈填满后就到了_thread_stack也就是线程栈
执行 switch_task(cur, next);,翻译成汇编就是
1 2 3 push next_thread push cur_thread call switch_task
再展开一点就是
1 2 3 4 push next_thread push cur_thread push retaddr jmp switch_task
然后switch_task开头会保存那几个寄存器,此时 cur 线程的线程栈就被保存为:
1 2 3 4 5 6 7 8 ebx -- ebx 成员 (栈顶) edi -- edi 成员 esi -- esi 成员 ebp -- ebp 成员 switch_task_retaddr -- eip 成员,是switch_task返回地址 next_thread -- unused_retaddr 成员 cur_thread -- function 成员 task_schedule_retaddr -- func_arg 成员
然后执行到mov esp, [eax]之后就切换到了next_thread的栈, 也是如下结构
1 2 3 4 5 6 7 8 ebx edi esi ebp eip ← ret unused_retaddr ← 返回地址 function ← 参数1 func_arg ← 参数2
会ret 跳转到 eip 成员。此时eip成员的值有两种情况:
我们就认为是第二种情况的话,其栈帧就会和
1 2 3 4 5 6 7 8 ebx -- ebx 成员 (栈顶) edi -- edi 成员 esi -- esi 成员 ebp -- ebp 成员 switch_task_retaddr -- eip 成员,是switch_task返回地址 next_thread -- unused_retaddr 成员 cur_thread -- function 成员 task_schedule_retaddr -- func_arg 成员
一致,那么此时会跳转到switch_task返回地址返回到task_schedule的结束部分,然后跳转到task_schedule返回地址,也就返回到_timer_intr_handler时钟中断处理程序的结束部分,最后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 call [IntrHandlerTable + %1*4] jmp intr_exit ; ============= intr_entry_table数组(编译后会将相同的section合并成一个段) section .data dd intr%1entry ;存储各个中断入口程序的地址, %endmacro ;多行宏结束标志 section .text global intr_exit intr_exit: ; =========== 恢复上下文 add esp, 4 ; 跳过中断号 popad pop gs pop fs pop es pop ds add esp, 4 ; 跳过error_code ; =========== 返回 iretd
如上所示从中断中返回,也就是说上次被时钟中断调度的时候执行到哪,这次就回到那里继续执行!非常完美。
我们执行一下看看呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 #include "kernel/print.h" #include "init.h" #include "debug.h" #include "interrupt.h" #include "memory.h" #include "thread.h" void tA (void *) ;void tB (void *) ;int main (void ) { print("kernel made by r3t2\n" ); init(); char * strA = "TA" ; char * strB = "TB" ; thread_create("testA" , 31 , tA, strA); thread_create("testB" , 8 , tB, strB); _enable_intr(); while (1 ) { print("main thread" ); } return 0 ; } void tA (void * arg) { char * str = arg; print(str); } void tB (void * arg) { char * str = arg; print(str); }
如上所示,使用 qemu 执行一下
缺页中断!(我优化了一下默认中断处理程序会输出中断类型,暂时还没有实现缺页中断的处理)
为什么?试想一下,如果一个线程被调度执行,执行完毕之后呢?
也就是说kernel_thread(thread_func* function, void* func_arg)这个函数执行完成了,需要返回,此时栈上的返回地址是:unused_retaddr。会返回到未设置的地方!这里的缺页地址正是一个典型的未初始化的垃圾值。所以说还需要一个给执行完毕的线程“处理后事”的函数,来让线程正确的结束退出。#todo。
所以暂时在kernel_thread(thread_func* function, void* func_arg)返回前加一个死循环,后面实现一个线程退出函数再去掉即可。或者把几个线程执行的函数后面加上死循环。再测试就正常了。
我们好像少考虑了什么…线程安全!
实现线程怎么能不考虑线程安全呢?按照参考书说法,这里再继续执行一会就会触发 #GP General Protection Exception 这个中断,因为我们实现的打印函数不是线程安全的:对于显存、硬件的光标寄存器和全局的打印内容缓冲区都没有保护。
为了保证打印操作的原子性,在打印前关中断避免其他线程的调度,打印后开中断恢复,就可以了。但这只是权宜之计。
后续应该实现锁的机制来保证线程安全#todo