作業系統真相還原 第十章 輸入輸出系統
第十章 輸入輸出系統
同步機制-鎖
找出程式碼中的臨界區、互斥、競爭條件
臨界區:多個任務訪問同一資源時,每個任務中訪問資源的指令程式碼被稱為臨界區,臨界區是程式碼,並不是資源。
互斥:也可稱為排他,是指某一時刻公共資源只能被一個任務獨享。
競爭條件:指多個任務以非互斥的方式同時進入臨界區,大家對公共資源的訪問是以競爭的方式並行進行的。
多執行緒訪問公共資源時出問題的原因是產生了競爭條件,也就是多個任務同時出現在自己的臨界區。為避免產生競爭條件,必須保證任意時刻只能有1個任務處於臨界區。因此,只要保證各執行緒自己臨界區中的所有程式碼都是原子操作,即臨界區中的指令要麼一條不做,要麼 氣呵成全部執行完,執行期間絕對不能被換下處理器。
其實,之所以出現競爭條件,歸根結底是因為臨界區中的指令太多了,如果臨界區僅有一條指令的話,這本身已屬於原子操作,完全不需要互斥。因此,在臨界區中指令多於一條時才需要互斥。當然,臨界區中很少存在只有一條指令的情況,因此我們必須提供一種互斥的機制,互斥能使臨界區具有原子性,避免產生競爭條件,從而避免了多工訪問公共資源時出問題。
訊號量
互斥可以使用硬體機制關中斷的方式實現,但是如果臨界區範圍太大,使用關中斷方式會影響併發。可使用鎖的方式來實現互斥,不過鎖中可以也要藉助關中斷來做原子操作,不過這個關中斷的範圍較小。
訊號量:
用於實現鎖。在計算機中,訊號量就是個0以上的整數值,當為0時表示己無可用訊號 ,或者說條件不再允許,因它表示某種訊號的累積“量“,故稱為訊號量。訊號量就是個計數器,它的計數值是自然數,用來記錄所積累訊號的數量。訊號是個泛指,取決於訊號量的實際應用環境。訊號的意義取決於您用訊號來做什麼,訊號量僅僅是一種程式設計構造方法。
兩個操作:
增加操作 up:
- 將訊號量的值加1。
- 喚醒在此訊號量上等待的執行緒。
減少操作 down:
- 判斷訊號量是否大於0。
- 若訊號量大於0,則將訊號減1。
- 若訊號量等於0,當前執行緒將自己阻塞,在此訊號量上等待。
訊號量是個全域性共享變數,up和down又都是讀寫這個全域性變數的操作,而且它們都包含一系列的子操作,因此它們必須都是原子操作。
如果訊號量取值只為0和1,便稱為二元訊號量,我們可以利用二元訊號量來實現鎖。
二元訊號量中, down 操作就是獲得鎖, up 操作就是釋放鎖 我們可以讓執行緒通過鎖進入臨界區,可以藉此保證只有一個執行緒可以進入臨界區,從而做到互斥,大致流程為:
• 執行緒A進入臨界區前先通過 down 操作獲得鎖,此時訊號量的值便為0。
• 後續執行緒B再進入臨界區時也通過 down 操作獲得鎖,由於訊號量為0,執行緒B便在此訊號量上等待,也就是相當於執行緒B進入了睡眠態。
• 當執行緒A從臨界區出來後執行 up 操作釋放鎖,此時訊號量的值重新變成1 ,之後執行緒A將執行緒B喚醒。
• 執行緒B醒來後獲得了鎖,進入臨界區。
執行緒的阻塞與喚醒
阻塞
阻塞:執行緒執行的原理是排程器從就緒佇列中取出執行緒上cpu,如果不把執行緒放入就緒佇列,就可以實現阻塞。(阻塞並不一定跟鎖有關,阻塞並不一定跟鎖有關,阻塞並不一定跟鎖有關)
阻塞是本執行緒自己發出的動作,自己阻塞自己,阻塞是主動的。而已阻塞的執行緒需要別人來喚醒,喚醒是被動的。
thread_block,當前執行緒將自己阻塞,呼叫schedule()函式將當前執行緒換下處理器,schedule函式將狀態為TASK_RUNNING的執行緒再次加入到就緒佇列,狀態為阻塞狀態的執行緒不會加入到就緒佇列。
/* 當前執行緒將自己阻塞,標誌其狀態為stat. */
void thread_block(enum task_status stat) {
/* stat取值為TASK_BLOCKED,TASK_WAITING,TASK_HANGING,也就是隻有這三種狀態才不會被排程*/
ASSERT(((stat == TASK_BLOCKED) || (stat == TASK_WAITING) || (stat == TASK_HANGING)));
enum intr_status old_status = intr_disable();
struct task_struct* cur_thread = running_thread();
cur_thread->status = stat; // 置其狀態為stat
schedule(); // 將當前執行緒換下處理器
/* 待當前執行緒被解除阻塞後才繼續執行下面的intr_set_status */
intr_set_status(old_status);
}
/* 實現任務排程 */
void schedule() {
ASSERT(intr_get_status() == INTR_OFF);
struct task_struct* cur = running_thread();
if (cur->status == TASK_RUNNING) { // 若此執行緒只是cpu時間片到了,將其加入到就緒佇列尾
ASSERT(!elem_find(&thread_ready_list, &cur->general_tag));
list_append(&thread_ready_list, &cur->general_tag);
cur->ticks = cur->priority; // 重新將當前執行緒的ticks再重置為其priority;
cur->status = TASK_READY;
} else {
/* 若此執行緒需要某事件發生後才能繼續上cpu執行,
不需要將其加入佇列,因為當前執行緒不在就緒佇列中。*/
}
ASSERT(!list_empty(&thread_ready_list));
thread_tag = NULL; // thread_tag清空
/* 將thread_ready_list佇列中的第一個就緒執行緒彈出,準備將其排程上cpu. */
thread_tag = list_pop(&thread_ready_list);
struct task_struct* next = elem2entry(struct task_struct, general_tag, thread_tag);
next->status = TASK_RUNNING;
switch_to(cur, next);
}
需要藉助關中斷實現原子操作,關了中斷就不響應時鐘中斷了,也就沒有執行緒切換了,相當於單執行緒執行。
/* 關中斷,並且返回關中斷前的狀態 */
enum intr_status intr_disable() {
enum intr_status old_status;
if (INTR_ON == intr_get_status()) {
old_status = INTR_ON;
asm volatile("cli" : : : "memory"); // 關中斷,cli指令將IF位置0
return old_status;
} else {
old_status = INTR_OFF;
return old_status;
}
}
/* 將中斷狀態設定為status */
enum intr_status intr_set_status(enum intr_status status) {
return status & INTR_ON ? intr_enable() : intr_disable();
}
喚醒
喚醒是被動的。
thread_unblock,它將某執行緒解除阻塞,也就是喚醒某執行緒。被阻塞的執行緒己無法執行,無法自己喚醒自己,必須被其他執行緒喚醒,因此引數 pthread 指向的是目前已經被阻塞,又希望被喚醒的執行緒。函式 thread unblock 是由當前執行的執行緒呼叫的,由它實施喚醒動作,它是被阻塞執行緒 pthread 的“救世主”。list_push,將被阻塞執行緒放到就緒佇列中,放在佇列的頭部,使其儘快得到排程,狀態改為TASK_READY。
/* 將執行緒pthread解除阻塞 */
void thread_unblock(struct task_struct* pthread) {
enum intr_status old_status = intr_disable();
ASSERT(((pthread->status == TASK_BLOCKED) || (pthread->status == TASK_WAITING) || (pthread->status == TASK_HANGING)));
if (pthread->status != TASK_READY) {
ASSERT(!elem_find(&thread_ready_list, &pthread->general_tag));
if (elem_find(&thread_ready_list, &pthread->general_tag)) {
PANIC("thread_unblock: blocked thread in ready_list\n");
}
list_push(&thread_ready_list, &pthread->general_tag); // 放到佇列的最前面,使其儘快得到排程
pthread->status = TASK_READY;
}
intr_set_status(old_status);
}
鎖的實現
訊號量的實現
訊號量結構
/* 訊號量結構體 */
struct semaphore {
uint8_t value;
struct list waiters;
};
初始化
/* 初始化訊號量 */
void sema_init(struct semaphore* psema, uint8_t value) {
psema->value = value; // 為訊號量賦初值
list_init(&psema->waiters); //初始化訊號量的等待佇列
}
訊號量down操作
- 判斷訊號量是否大於0。
- 若訊號量大於0,則將訊號減1。
- 若訊號量等於0,當前執行緒將自己阻塞,在此訊號量上等待。
判斷訊號量是否為0,應使用while而非if。
訊號量為0,thread_block阻塞當前執行緒,被喚醒後從thread_block下繼續執行。喚醒後也有可能有新的執行緒建立競爭訊號量並且先一步獲取到訊號量(從呼叫sema_down函式到函式執行第一條語句關中斷前,也可能會有其他執行緒拿到訊號量),如果使用if會出現對已經為0的訊號量減一,使用while可以再次判斷訊號量。
/* 訊號量down操作 */
void sema_down(struct semaphore* psema) {
/* 關中斷來保證原子操作 */
enum intr_status old_status = intr_disable();
while(psema->value == 0) { // 若value為0,表示已經被別人持有
ASSERT(!elem_find(&psema->waiters, &running_thread()->general_tag));
/* 當前執行緒不應該已在訊號量的waiters佇列中 */
if (elem_find(&psema->waiters, &running_thread()->general_tag)) {
PANIC("sema_down: thread blocked has been in waiters_list\n");
}
/* 若訊號量的值等於0,則當前執行緒把自己加入該鎖的等待佇列,然後阻塞自己 */
list_append(&psema->waiters, &running_thread()->general_tag);
thread_block(TASK_BLOCKED); // 阻塞執行緒,直到被喚醒
}
/* 若value為1或被喚醒後,會執行下面的程式碼,也就是獲得了鎖。*/
psema->value--;
ASSERT(psema->value == 0);
/* 恢復之前的中斷狀態 */
intr_set_status(old_status);
}
訊號量的up操作
增加操作 up:
- 將訊號量的值加1。
- 喚醒在此訊號量上等待的執行緒。從waiters中取第一個,即每次只喚醒一個。
/* 訊號量的up操作 */
void sema_up(struct semaphore* psema) {
/* 關中斷,保證原子操作 */
enum intr_status old_status = intr_disable();
ASSERT(psema->value == 0);
if (!list_empty(&psema->waiters)) {
struct task_struct* thread_blocked = elem2entry(struct task_struct, general_tag, list_pop(&psema->waiters));
thread_unblock(thread_blocked);
}
psema->value++;
ASSERT(psema->value == 1);
/* 恢復之前的中斷狀態 */
intr_set_status(old_status);
}
鎖的實現
鎖結構
成員holder_repeat_nr用來累積鎖的持有者重複申請鎖的次數,釋放鎖的時候會參考此變數的值。原因是一般情況下我們應該在進入臨界區之前加鎖,但有時候可能持有了某臨界區的鎖後,在未釋放鎖之前,有可能會再次呼叫重複申請此鎖的函式,這樣一來,內外層函式在釋放鎖時會對同一個鎖釋放兩次。(可重入)
/* 鎖結構 */
struct lock {
struct task_struct* holder; // 鎖的持有者,執行緒
struct semaphore semaphore; // 用二元訊號量實現鎖
uint32_t holder_repeat_nr; // 鎖的持有者重複申請鎖的次數
};
初始化
/* 初始化鎖plock */
void lock_init(struct lock* plock) {
plock->holder = NULL;
plock->holder_repeat_nr = 0;
sema_init(&plock->semaphore, 1); // 訊號量初值為1
}
獲取鎖
lock_acquire,如果是鎖的擁有者,次數加1,可重入。如果不是鎖的擁有者,執行訊號量減操作,如果成功,鎖的擁有者設定為當前執行緒,次數為1。
/* 獲取鎖plock */
void lock_acquire(struct lock* plock) {
if (plock->holder != running_thread()) {
sema_down(&plock->semaphore); // 對訊號量P操作,原子操作
plock->holder = running_thread();
ASSERT(plock->holder_repeat_nr == 0);
plock->holder_repeat_nr = 1;
} else {
/* 排除曾經自己已經持有鎖但還未將其釋放的情況。*/
plock->holder_repeat_nr++;
}
}
釋放鎖
lock_release,鎖的持有者設定為null,次數為0,執行訊號量加操作。
/* 釋放鎖plock */
void lock_release(struct lock* plock) {
ASSERT(plock->holder == running_thread());
if (plock->holder_repeat_nr > 1) {
plock->holder_repeat_nr--;
return;
}
ASSERT(plock->holder_repeat_nr == 1);
plock->holder = NULL; // 把鎖的持有者置空放在V操作之前
plock->holder_repeat_nr = 0;
sema_up(&plock->semaphore); // 訊號量的V操作,也是原子操作
}
環形輸入緩衝區
輸入單個字元沒有實際用途,一般需要輸入多個字元,以回車鍵結束,然後解析完整的一串字元。所以需要一個緩衝區暫存輸入,回車後一起處理。
生產者與消費者問題簡述
執行緒同步:同步指按預定的先後次序進行執行,指多個執行緒相互協作,共同完成一個任務,屬於執行緒間工作步調的相互制約。生產者消費者是執行緒同步的經典例子。
生產者與消費者問題是描述多個執行緒協同工作的模型,當初是由荷蘭Dijkstra 為演示訊號量而提出的,訊號量解決了協同工作中的“同步”和“互斥”。
生產者消費者問題:對於有限大小的公共緩衝區,如何同步生產者與消費者的執行,以達到對共同緩衝區的互斥訪問,並且保證生產者不會過度生產,消費者不會過度消費,緩衝區不會被破壞。
環形緩衝區的實現
環形緩衝區是個線性佇列,首尾相連可實現環形。
lock 是本緩衝區的鎖,每次對緩衝區操作時都要先申請這個鎖,從而保證緩衝區操作互斥。
producer 是生產者,此項來記錄當緩衝區滿時,在此緩衝區睡眠的生產者執行緒。
consumer 是消費者,此項來記錄當緩衝區空時,在此緩衝區睡眠的消費者執行緒。
buf[bufsize] 是定義的緩衝區陣列,其大小為 bufsize。
head 是緩衝區佇列的隊首地址, tail 是隊尾地址
/* 環形佇列 */
struct ioqueue {
// 生產者消費者問題
struct lock lock;
/* 生產者,緩衝區不滿時就繼續往裡面放資料,
* 否則就睡眠,此項記錄哪個生產者在此緩衝區上睡眠。*/
struct task_struct* producer;
/* 消費者,緩衝區不空時就繼續從往裡面拿資料,
* 否則就睡眠,此項記錄哪個消費者在此緩衝區上睡眠。*/
struct task_struct* consumer;
char buf[bufsize]; // 緩衝區大小
int32_t head; // 隊首,資料往隊首處寫入
int32_t tail; // 隊尾,資料從隊尾處讀出
};
/* 初始化io佇列ioq */
void ioqueue_init(struct ioqueue* ioq) {
lock_init(&ioq->lock); // 初始化io佇列的鎖
ioq->producer = ioq->consumer = NULL; // 生產者和消費者置空
ioq->head = ioq->tail = 0; // 佇列的首尾指標指向緩衝區陣列第0個位置
}
/* 返回pos在緩衝區中的下一個位置值 */
static int32_t next_pos(int32_t pos) {
return (pos + 1) % bufsize;
}
/* 判斷佇列是否已滿 */
bool ioq_full(struct ioqueue* ioq) {
ASSERT(intr_get_status() == INTR_OFF);
return next_pos(ioq->head) == ioq->tail;
}
/* 判斷佇列是否已空 */
static bool ioq_empty(struct ioqueue* ioq) {
ASSERT(intr_get_status() == INTR_OFF);
return ioq->head == ioq->tail;
}
ioq_putchar
生產者寫入字元
- 如果緩衝區已滿,獲取緩衝區的鎖(緩衝區的鎖,每次對緩衝區操作時都要先申請這個鎖,從而保證緩衝區操作互斥。),然後阻塞該執行緒(阻塞並不一定跟鎖有關),被喚醒後才能釋放緩衝區的鎖。緩衝區滿的時候才獲取鎖。
- 如果緩衝區未滿,寫入字元。如果有消費者在等待,喚醒消費者。
/* 生產者往ioq佇列中寫入一個字元byte */
void ioq_putchar(struct ioqueue* ioq, char byte) {
ASSERT(intr_get_status() == INTR_OFF);
/* 1.若緩衝區(佇列)已經滿了,獲取鎖
* 2.把生產者ioq->producer記為自己(為的是當緩衝區裡的東西被消費者取完後讓消費者知道喚醒哪個生產者),阻塞當前執行緒
* 3.被消費者喚醒後,釋放鎖
*/
while (ioq_full(ioq)) {
lock_acquire(&ioq->lock);
ioq_wait(&ioq->producer);
lock_release(&ioq->lock);
}
ioq->buf[ioq->head] = byte; // 把位元組放入緩衝區中
ioq->head = next_pos(ioq->head); // 把寫遊標移到下一位置
if (ioq->consumer != NULL) {
wakeup(&ioq->consumer); // 喚醒消費者
}
}
/* 使當前生產者或消費者在此緩衝區上等待 */
static void ioq_wait(struct task_struct** waiter) {
ASSERT(*waiter == NULL && waiter != NULL);
*waiter = running_thread();
thread_block(TASK_BLOCKED);
}
/* 喚醒waiter */
static void wakeup(struct task_struct** waiter) {
ASSERT(*waiter != NULL);
thread_unblock(*waiter);
*waiter = NULL;
}
消費者讀取字元
/* 消費者從ioq佇列中獲取一個字元 */
char ioq_getchar(struct ioqueue* ioq) {
ASSERT(intr_get_status() == INTR_OFF);
/* 1.若緩衝區(佇列)為空,獲取鎖
* 2.把消費者ioq->consumer記為當前執行緒自己(目的是將來生產者往緩衝區裡裝商品後,生產者知道喚醒哪個消費者),阻塞當前執行緒
* 3.被生產者喚醒後,釋放鎖
* */
while (ioq_empty(ioq)) {
lock_acquire(&ioq->lock);
ioq_wait(&ioq->consumer);
lock_release(&ioq->lock);
}
char byte = ioq->buf[ioq->tail]; // 從緩衝區中取出
ioq->tail = next_pos(ioq->tail); // 把讀遊標移到下一位置
if (ioq->producer != NULL) {
wakeup(&ioq->producer); // 喚醒生產者
}
return byte;
}
總結:緩衝區滿的時候或空的時候(到達邊界值),第一步獲取鎖,第二步阻塞當前執行緒,讓出cpu,等待其他執行緒工作後喚醒阻塞執行緒。