作業系統實驗七實驗報告
實驗七:同步互斥
練習0:填寫已有實驗
使用meld可以簡單地將前幾個lab的程式碼填入lab7中,但是要注意在這次實驗中,部分程式碼需要做出修改,如下,主要是trap_dispatch
這一個函式
kern/trap/trap.c
中lab6的部分程式碼
...
ticks++;
assert(current != NULL);
run_timer_list(); //lab6中的處理方式是臨時的,lab7開始啟動計時器機制,具體實現在練習1中解釋
break;
...
練習1:理解核心級訊號量的實現和基於核心級訊號量的哲學家就餐問題
1、同步互斥機制的底層實現
計時器
計時器通過定義在kern/schedule/sched.[ch]
中的函式完成,計時器提供了基於時間事件的調節機制,在ucore中利用計時器可以實現基於時間長度的睡眠等待和喚醒機制,每當時鐘中斷髮生時,ucore就可以產生相應的時間事件。計時器相關的資料結構和基本操作如下:
sched.h
中定義了timer_t
的基本資料結構
typedef struct {
unsigned int expires; //the expire time 計時長度
struct proc_struct *proc; //the proc wait in this timer. If the expire time is end, then this proc will be scheduled 該計時器對應的程序
list_entry_t timer_link; //the timer list 計時器連結串列
} timer_t;
timer_init
對計時器初始化
static inline timer_t *
timer_init(timer_t *timer, struct proc_struct *proc, int expires) {
timer->expires = expires; //初始化計時長度
timer->proc = proc; //初始化計時器繫結的程序
list_init(&(timer->timer_link)); //初始化計時器連結串列
return timer;
}
add_timer
向系統新增已初始化的新計時器
void
add_timer(timer_t *timer) {
bool intr_flag;
local_intr_save(intr_flag);
{
assert(timer->expires > 0 && timer->proc != NULL);
assert(list_empty(&(timer->timer_link)));
list_entry_t *le = list_next(&timer_list);
while (le != &timer_list) { //while迴圈的作用在於將一個計時器放入到合適的位置,結合del_timer可以看出,每個計時器實際計時的值為計時佇列在這個計時器之前的計時值之和
timer_t *next = le2timer(le, timer_link);
if (timer->expires < next->expires) { //例如新計時器值為5,已有的計時佇列為2->3->6->7
next->expires -= timer->expires; //則5-2=3,3-3=0,最終新佇列為2->3->0->6->7,顯然當第1、2個計時器都走完時第3個計時器走0步就走完,符合初始值2+3=5
break; //因此呼叫run_timer_list時每次只需要減少第一個計時器的值
}
timer->expires -= next->expires;
le = list_next(le);
}
list_add_before(le, &(timer->timer_link));
}
local_intr_restore(intr_flag);
}
del_timer
取消一個計時器
void
del_timer(timer_t *timer) {
bool intr_flag;
local_intr_save(intr_flag);
{
if (!list_empty(&(timer->timer_link))) {
if (timer->expires != 0) {
list_entry_t *le = list_next(&(timer->timer_link));
if (le != &timer_list) {
timer_t *next = le2timer(le, timer_link);
next->expires += timer->expires; //結合add_timer的機制可以看出,取消後只需要在下一個計時器上加上取消的計時器的當前計時值就可以保證後續每一個計時器的實際計時值都與設定值一致
}
}
list_del_init(&(timer->timer_link));
}
}
local_intr_restore(intr_flag);
}
run_timer_list
更新系統計時並喚醒計時器歸零可以被啟用的程序
void
run_timer_list(void) {
bool intr_flag;
local_intr_save(intr_flag);
{
list_entry_t *le = list_next(&timer_list);
if (le != &timer_list) {
timer_t *timer = le2timer(le, timer_link);
assert(timer->expires != 0);
timer->expires --; //只需要在第一個計時器上減1即可,由於程序加入計時的頻率應遠遠小於時鐘中斷的頻率,這樣設計計時佇列計時值的更新,可以減小開銷避免每次時鐘中斷都要遍歷整個計時器佇列
while (timer->expires == 0) { //當歸零時執行喚醒,由於可能存在後續也為0例如add_timer中註釋舉得例子,用while迴圈將所有歸零的計時器對應的程序啟用
le = list_next(le);
struct proc_struct *proc = timer->proc;
if (proc->wait_state != 0) {
assert(proc->wait_state & WT_INTERRUPTED);
}
else {
warn("process %d's wait_state == 0.\n", proc->pid);
}
wakeup_proc(proc);
del_timer(timer);
if (le == &timer_list) {
break;
}
timer = le2timer(le, timer_link);
}
}
sched_class_proc_tick(current); //執行排程演算法
}
local_intr_restore(intr_flag);
}
遮蔽與使能中斷
中斷的遮蔽與使能通過定義在kern/sync/sync.h
中的函式完成,原始碼較為簡單,基本呼叫關係如下:
關中斷:local_intr_save -> __intr_save -> intr_disable -> cli
開中斷:local_intr_restore -> __intr_restore -> intr_enable -> sti
需要用到中斷相關的操作時按如下格式即可:
...
bool intr_flag;
local_intr_save(intr_flag);
{
critical code...
}
local_intr_restore(intr_flag);
...
等待佇列
等待佇列通過定義在kern/sync/wait.[ch]
中的資料結構和函式完成
wait.h
中定義了等待佇列的基本資料結構
typedef struct {
list_entry_t wait_head;
} wait_queue_t; //wait_queue的頭節點
typedef struct {
struct proc_struct *proc; //與該wait節點繫結的程序指標
uint32_t wakeup_flags; //等待原因標誌
wait_queue_t *wait_queue; //指向此wait節點所屬的wait_queue頭節點
list_entry_t wait_link; //等待佇列連結串列,組織wait節點的連結
} wait_t; //wait節點
wait.c
中定義了等待佇列的基本操作,與連結串列類似,這裡給出介面不分析原始碼
#define le2wait(le, member) //通過連結串列節點獲得wait節點
void wait_init(wait_t *wait, struct proc_struct *proc); //初始化wait結構
void wait_queue_init(wait_queue_t *queue); //初始化wait_queue結構
void wait_queue_add(wait_queue_t *queue, wait_t *wait); //把wait前插到wait_queue中
void wait_queue_del(wait_queue_t *queue, wait_t *wait); //從wait_queue中刪除wait
wait_t *wait_queue_next(wait_queue_t *queue, wait_t *wait); //取wait的後一個wait
wait_t *wait_queue_prev(wait_queue_t *queue, wait_t *wait); //取wait的前一個wait
wait_t *wait_queue_first(wait_queue_t *queue); //取wait_queue的第一個wait
wait_t *wait_queue_last(wait_queue_t *queue); //取wait_queue的最後一個wait
bool wait_queue_empty(wait_queue_t *queue); //判斷wait_queue是否為空
bool wait_in_queue(wait_t *wait); //判斷wait是否在wait_queue中
//以下高層函式基於了上述底層函式實現了相關操作
//喚醒與wait關聯的程序
void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del);
//喚醒wait_queue上的第一個wait所關聯的程序
void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del);
//喚醒wait_queue上的所有wait所關聯的程序
void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del);
//將wait於當前程序關聯,並讓當前程序所關聯的wait進入wait_queue,即睡眠當前程序
void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state);
//將當前程序關聯的wait從wait_queue刪除
#define wait_current_del(queue, wait)
2、訊號量
有了同步互斥機制的底層支撐,可以實現訊號量。訊號量的原理性描述參考作業系統相關的書籍,如下:
struct semaphore {
int count;
queueType queue;
};
void semWait(semaphore s)
{
s.count--;
if (s.count < 0) {
/* place this process in s.queue */;
/* block this process */;
}
}
void semSignal(semaphore s)
{
s.count++;
if (s.count<= 0) {
/* remove a process P from s.queue */;
/* place process P on ready list */;
}
}
在ucore中,訊號量通過定義在kern/sync/sem.[ch]
中的資料結構和函式實現
sem.h
中定義了訊號量的基本資料結構
typedef struct {
int value; //訊號量的當前值
wait_queue_t wait_queue; //該訊號對應的等待佇列
} semaphore_t;
__down
實現訊號量的P操作
static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
local_intr_save(intr_flag); //關中斷
if (sem->value > 0) { //若訊號量的值大於0,可以獲得訊號量,則減1並開中斷後立刻返回
sem->value --;
local_intr_restore(intr_flag);
return 0;
}
wait_t __wait, *wait = &__wait;
wait_current_set(&(sem->wait_queue), wait, wait_state); //無法獲得訊號量,當前程序需要等待,睡眠並被加入等待佇列
local_intr_restore(intr_flag); //關中斷
schedule(); //排程其他程序執行
local_intr_save(intr_flag); //當被重新喚醒後,將自身的wait從等待佇列中刪除
wait_current_del(&(sem->wait_queue), wait);
local_intr_restore(intr_flag);
if (wait->wakeup_flags != wait_state) { //若喚醒原因與睡眠原因不同,則返回異常標誌,否則返回0
return wait->wakeup_flags;
}
return 0;
}
__up
實現訊號量的V操作
static __noinline void __up(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
local_intr_save(intr_flag); //關中斷
{
wait_t *wait;
if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) {
sem->value ++; //若沒有程序在等待,則訊號量加1
}
else { //有程序在等待,則喚醒
assert(wait->proc->wait_state == wait_state);
wakeup_wait(&(sem->wait_queue), wait, wait_state, 1);
}
}
local_intr_restore(intr_flag); //開中斷並返回
}
3、哲學家就餐問題(訊號量)
5個哲學家圍繞一張圓桌而坐,桌子上每兩個哲學家之間放置1支叉子,共5支;
哲學家的動作包括思考和進餐,進餐時需要同時得到左右兩支叉子,思考時放回叉子;
如何保證哲學家動作有序進行?例如不出現有人永遠拿不到叉子等異常
哲學家就餐問題(體現在kern/sync/check_sync.c
中)利用訊號量來解決,給每個哲學家一個訊號量s[i]
,同時記錄每個哲學家的狀態state_sema[i]
為三種THINKING
、HUNGRY
、EATING
,由於叉子是共享資源,因此在一個哲學家拿起/放下叉子時需要臨界區互斥,用訊號量mutex
來實現。實現如下
//---------- philosophers problem using semaphore ----------------------
int state_sema[N]; //記錄每個哲學家的狀態
semaphore_t mutex; //臨界區互斥
semaphore_t s[N]; //每個哲學家一個訊號量
struct proc_struct *philosopher_proc_sema[N]; //每個哲學家對應1個程序
void phi_test_sema(i) //測試哲學家i是否能進行EATING
{
if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING
&&state_sema[RIGHT]!=EATING) //哲學家i的狀態是HUNGRY且左右都沒有人在吃,則可以進入EATING
{
state_sema[i]=EATING;
up(&s[i]); //由於i已開始EATING,執行V操作
}
}
void phi_take_forks_sema(int i) //獲得兩支叉子
{
down(&mutex); //進入臨界區
state_sema[i]=HUNGRY; //設定哲學家i進入HUNGRY
phi_test_sema(i); //測試能否獲得叉子並進行EATING
up(&mutex); //離開臨界區
down(&s[i]); //注意測試中如果得到了叉子進入EATING則執行了V操作,此時執行P操作配對;若沒有得到叉子而執行P操作就進入堵塞,哲學家持續等待HUNGRY狀態
}
void phi_put_forks_sema(int i) //放回兩支叉子
{
down(&mutex); //進入臨界區
state_sema[i]=THINKING; //設定哲學家i進入THINKING
phi_test_sema(LEFT); //測試一下左鄰居現在是否能進餐
phi_test_sema(RIGHT); //測試一下右鄰居現在是否能進餐
up(&mutex); //離開臨界區
}
int philosopher_using_semaphore(void * arg) //哲學家問題基於訊號量的完整實現過程
{
int i, iter=0;
i=(int)arg;
cprintf("I am No.%d philosopher_sema\n",i);
while(iter++<TIMES) //測試次數TIMES
{
cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i);
do_sleep(SLEEP_TIME);
phi_take_forks_sema(i); //需要兩隻叉子,得不到就堵塞
cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i);
do_sleep(SLEEP_TIME);
phi_put_forks_sema(i); //放回兩隻叉子
}
cprintf("No.%d philosopher_sema quit\n",i);
return 0;
}
ucore啟動執行後在init_main
中通過呼叫check_sync
函式來進行模擬測試過程,程式碼及解釋如下:
...
//check semaphore
sem_init(&mutex, 1); //臨界區訊號量初始化為1
for(i=0;i<N;i++){ //迴圈呼叫kernel_thread建立N=4個哲學家程序
sem_init(&s[i], 0);
int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0);
if (pid <= 0) {
panic("create No.%d philosopher_using_semaphore failed.\n");
}
philosopher_proc_sema[i] = find_proc(pid);
set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc");
}
...
練習2:完成核心級條件變數和基於核心級條件變數的哲學家就餐問題
1、管程機制
管程將對共享資源的訪問及所需要的同步操作集中並封裝起來,由四部分組成:
- 管程內部的共享變數
- 管程內部的條件變數
- 管程內部併發執行的過程
- 區域性於管程內部的共享資料設定初始化的語句
侷限在管程中的資料結構,只能被侷限在管程的操作過程所訪問,任何管程之外的操作過程都不能訪問它;另一方面,侷限在管程中的操作過程也主要訪問管程內的資料結構。管程相當於一個隔離區,它把共享變數和對它進行操作的若干個過程圍了起來,所有程序要訪問臨界資源時,都必須經過管程才能進入,而管程每次只允許一個程序進入管程,從而需要確保程序之間互斥。
條件變數(Condition Variables, CV)可以代表一個程序的等待佇列,佇列中的程序都在等待某一個條件,涉及到的核心操作如下:
Wait
操作將自身阻塞在等待佇列中,並喚醒一個等待者或釋放管程的互斥訪問Signal
操作將等待佇列中的一個執行緒喚醒,如果等待佇列為空則等同於空操作
在實際實現管程時,當前程序正在管程中操作,此時等待的某個條件為真時:若當前程序若繼續執行到結束,再檢查條件喚醒等待佇列的程序,稱為Hansen管程;若當前程序立刻放棄管程、進入等待並喚醒這個條件對應的等待佇列的程序,待結束後再重新繼續執行自身程序,稱為Hoare管程
ucore中基於訊號量實現了Hoare管程解決哲學家就餐問題。
2、基於訊號量的管程實現
管程通過定義在kern/sync/monitor.[ch]
中的資料結構和函式實現,如下:
monitor.h
定義了管程的基本資料結構
typedef struct condvar{
semaphore_t sem; //用於發出cond_wait操作而使自身等待條件變數並進入睡眠的訊號量
int count; //條件變數下等待佇列中的程序數
monitor_t * owner; //此條件變數屬於哪個管程
} condvar_t;
typedef struct monitor{
semaphore_t mutex; //只允許一個程序進入管程的訊號量,初始化為1
semaphore_t next; //用來完成同步操作
int next_count; //記錄了由於發出cond_signal操作而睡眠的程序數
condvar_t *cv; //管程中的條件變數
} monitor_t;
monitor_init
實現初始化管程的操作
// Initialize monitor.
void
monitor_init (monitor_t * mtp, size_t num_cv) {
int i;
assert(num_cv>0);
mtp->next_count = 0; //發出cond_signal操作而睡眠的程序數初始為0
mtp->cv = NULL;
sem_init(&(mtp->mutex), 1); //初始化管程互斥鎖為1
sem_init(&(mtp->next), 0); //初始化同步操作的訊號量為0
mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv);
assert(mtp->cv!=NULL);
for(i=0; i<num_cv; i++){ //迴圈初始化所需要的多個條件變數
mtp->cv[i].count=0; //條件變數的等待佇列程序數初始為0
sem_init(&(mtp->cv[i].sem),0); //條件變數的訊號量初始為0
mtp->cv[i].owner=mtp; //條件變數均屬於正在初始化的管程
}
}
cond_wait
實現Wait
操作
// Suspend calling thread on a condition variable waiting for condition Atomically unlocks
// mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures
void
cond_wait (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
/*
* cv.count ++;
* if(mt.next_count>0)
* signal(mt.next)
* else
* signal(mt.mutex);
* wait(cv.sem);
* cv.count --;
*/
cvp->count ++; //該條件變數下等待佇列的程序數+1
if(cvp->owner->next_count > 0){
up(&(cvp->owner->next)); //若該條件變數所屬的管程有程序在monitor.next訊號量上進入睡眠,則喚醒之
}
else{
up(&(cvp->owner->mutex)); //若沒有,則釋放管程的互斥鎖喚醒無法進入管程的程序
}
down(&(cvp->sem)); //將自身進入睡眠並掛在條件變數的等待佇列中
cvp->count --; //當被喚醒後從這裡開始執行,條件變數的等待佇列程序數減1
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}
cond_signal
實現Signal
操作
// Unlock one of threads waiting on the condition variable.
void
cond_signal (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
/*
* cond_signal(cv) {
* if(cv.count>0) {
* mt.next_count ++;
* signal(cv.sem);
* wait(mt.next);
* mt.next_count--;
* }
* }
*/
if(cvp->count > 0){ //若該條件下沒有在等待的程序就直接跳過
cvp->owner->next_count ++; //發出cond_signal操作而睡眠的程序數加1
up(&(cvp->sem)); //喚醒該條件變數下等待佇列的程序
down(&(cvp->owner->next)); //將自身進入睡眠,掛在next訊號量上
cvp->owner->next_count --; //當被喚醒後,將掛在next訊號量上的睡眠程序數減1
}
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}
注意,為了使管程正常執行,管程中的每個函數出入口都需要相應的操作,如下:
function_in_monitor(...)
{
down(&(monitor.mutex)); //程序進入管程,獲取管程的互斥鎖
//-----------------------------
the real body of function;
//-----------------------------
if(monitor.next_count > 0) //若管程中還有睡眠的程序,喚醒,否則釋放管程互斥鎖
up(&(monitor.next));
else
up(&(monitor.mutex));
}
3、哲學家就餐問題(管程)
哲學家就餐問題(體現在kern/sync/check_sync.c
中)利用管程來解決,給每個哲學家一個條件變數mtp->cv[i]
,同時記錄每個哲學家的狀態state_condvar[i]
為三種THINKING
、HUNGRY
、EATING
,由於叉子是共享資源,用管程來實現。實現如下
//-----------------philosopher problem using monitor ------------
struct proc_struct *philosopher_proc_condvar[N];
int state_condvar[N];
monitor_t mt, *mtp=&mt;
void phi_test_condvar (int i) {
if(state_condvar[i]==HUNGRY&&state_condvar[LEFT]!=EATING
&&state_condvar[RIGHT]!=EATING) {
cprintf("phi_test_condvar: state_condvar[%d] will eating\n",i);
state_condvar[i] = EATING ;
cprintf("phi_test_condvar: signal self_cv[%d] \n",i);
cond_signal(&mtp->cv[i]) ; //可以進入EATING狀態,喚醒在條件變數i下等待的哲學家
}
}
void phi_take_forks_condvar(int i) {
down(&(mtp->mutex));
//--------into routine in monitor--------------
// LAB7 EXERCISE1: YOUR CODE
// I am hungry
// try to get fork
state_condvar[i] = HUNGRY; //設定哲學家i進入HUNGRY狀態
phi_test_condvar(i); //測試能否進入EATING
while(state_condvar[i] != EATING){
cprintf("phi_take_forks_condvar: %d didn't get fork and still wait\n", i);
cond_wait(&mtp->cv[i]); //若沒有進入EATING則將自身睡眠
}
//--------leave routine in monitor--------------
if(mtp->next_count>0) //執行到當前程序最後若管程還有在等待的程序則喚醒,否則釋放管程互斥鎖
up(&(mtp->next));
else
up(&(mtp->mutex));
}
void phi_put_forks_condvar(int i) {
down(&(mtp->mutex));
//--------into routine in monitor--------------
// LAB7 EXERCISE1: YOUR CODE
// I ate over
// test left and right neighbors
state_condvar[i] = THINKING; //設定哲學家i進入THINKING狀態
phi_test_condvar(LEFT); //測試左鄰居能否EATING
phi_test_condvar(RIGHT); //測試右鄰居能否EATING
//--------leave routine in monitor--------------
if(mtp->next_count>0) //執行到當前程序最後若管程還有在等待的程序則喚醒,否則釋放管程互斥鎖
up(&(mtp->next));
else
up(&(mtp->mutex));
}
//---------- philosophers using monitor (condition variable) ----------------------
int philosopher_using_condvar(void * arg) {
int i, iter=0;
i=(int)arg;
cprintf("I am No.%d philosopher_condvar\n",i);
while(iter++<TIMES)
{
cprintf("Iter %d, No.%d philosopher_condvar is thinking\n",iter,i);
do_sleep(SLEEP_TIME);
phi_take_forks_condvar(i);
cprintf("Iter %d, No.%d philosopher_condvar is eating\n",iter,i);
do_sleep(SLEEP_TIME);
phi_put_forks_condvar(i);
}
cprintf("No.%d philosopher_condvar quit\n",i);
return 0;
}
ucore啟動執行後在init_main
中通過呼叫check_sync
函式來進行模擬測試過程,程式碼及解釋如下:
//check condition variable
monitor_init(&mt, N);
for(i=0;i<N;i++){
state_condvar[i]=THINKING;
int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0);
if (pid <= 0) {
panic("create No.%d philosopher_using_condvar failed.\n");
}
philosopher_proc_condvar[i] = find_proc(pid);
set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc");
}
總結
完成全部程式碼後,呼叫make grade
可以獲得如下輸出,說明實驗成功
badsegment: (4.7s)
-check result: OK
-check output: OK
divzero: (2.8s)
-check result: OK
-check output: OK
softint: (2.8s)
-check result: OK
-check output: OK
faultread: (1.7s)
-check result: OK
-check output: OK
faultreadkernel: (1.6s)
-check result: OK
-check output: OK
hello: (3.3s)
-check result: OK
-check output: OK
testbss: (1.7s)
-check result: OK
-check output: OK
pgdir: (3.3s)
-check result: OK
-check output: OK
yield: (2.8s)
-check result: OK
-check output: OK
badarg: (3.3s)
-check result: OK
-check output: OK
exit: (2.8s)
-check result: OK
-check output: OK
spin: (2.8s)
-check result: OK
-check output: OK
waitkill: (4.0s)
-check result: OK
-check output: OK
forktest: (2.9s)
-check result: OK
-check output: OK
forktree: (2.9s)
-check result: OK
-check output: OK
priority: (15.7s)
-check result: OK
-check output: OK
sleep: (11.5s)
-check result: OK
-check output: OK
sleepkill: (2.9s)
-check result: OK
-check output: OK
matrix: (10.9s)
-check result: OK
-check output: OK
Total Score: 190/190