1. 程式人生 > >LDD-Time, Delays, and Deferred Work

LDD-Time, Delays, and Deferred Work

Measuring Time Lapses
系統用來計時的變數是jiffies_64,在系統啟動時會初始化為0,相關的函式都定義在linux/jiffies.h中;相關的巨集定義還有HZ。下列程式碼是jiffies和HZ的使用方法:
1 #include <linux/jiffies.h>
2 unsigned long j, stamp_1, stamp_half, stamp_n;
3 j = jiffies; /* read the current value */
4 stamp_1 = j + HZ; /* 1 second in the future 
*/ 5 stamp_half = j + HZ/2; /* half a second */ 6 stamp_n = j + n * HZ / 1000; /* n milliseconds */

要判斷兩個時間點的先後關係,可以採用下列方法:

1 #include <linux/jiffies.h>
2 int time_after(unsigned long a, unsigned long b);
3 int time_before(unsigned long a, unsigned long b);
4 int time_after_eq(unsigned long
a, unsigned long b); 5 int time_before_eq(unsigned long a, unsigned long b);

使用者程式用來表示時間的資料結構是struct timeval,struct timespec。前者包含兩個數,分別代表秒和毫秒;後者包含兩個數,分別代表秒和納秒。相關的函式定義如下:

1 #include <linux/time.h>
2 unsigned long timespec_to_jiffies(struct timespec *value);
3 void
jiffies_to_timespec(unsigned long jiffies, struct timespec *value); 4 unsigned long timeval_to_jiffies(struct timeval *value); 5 void jiffies_to_timeval(unsigned long jiffies, struct timeval *value);
每個CPU平臺一般都包含一個暫存器,每個時鐘週期增加一次。x86下相關的操作函式:
1 #include <asm/msr.h>
2 rdtsc(low32, high32);
3 rdtscl(low32);
4 rdtscll(var64);

通常情況下,只採用暫存器的低32位已經足夠——在1GHz的CPU上,每4.2秒才會溢位一次。

核心提供了通用的介面,定義在asm/timex.h:
#include <linux/timex.h>
cycles_t get_cycles(void);
關於處理器的時鐘暫存器,有以下一段話: There is one other thing worth knowing about timestamp counters: they are not necessarily synchronized across processors in an SMP system. To be sure of getting a coherent value, you should disable preemption for code that is querying the counter.   核心提供了函式將wall-clock time轉化為jiffies:
1 #include <linux/time.h>
2 unsigned long mktime (unsigned int year, unsigned int mon,
3                       unsigned int day, unsigned int hour,
4                       unsigned int min, unsigned int sec);

以及獲取當前時間的方法:

1 #include <linux/time.h>
2 void do_gettimeofday(struct timeval *tv);
3 struct timespec current_kernel_time(void);
  Delaying Execution
下列程式碼可以實現延時功能:
while (time_before(jiffies, j1))
    cpu_relax();

這種延時功能會導致CPU空轉,即忙等待,嚴重影響系統性能。如果在進入迴圈之前恰巧關閉了中斷,jiffies的值不會得到更新,導致迴圈的條件一直為真。

書中所說/proc/jitbusy的模組,我在3.10的核心中並未找到。 將上述程式碼的cpu_relax()函式替換為schedule(),可以在延時的過程中將CPU讓出;但是不能保證程序在之後能夠重新執行。 同樣地,/proc/jitsched也沒有找到。   要實現高效的延時,還需要核心的幫助,跟阻塞的I/O呼叫的函式相似:
1 #include <linux/wait.h>
2 long wait_event_timeout(wait_queue_head_t q, condition, long timeout);
3 long wait_event_interruptible_timeout(wait_queue_head_t q,
4                                 condition, long timeout);

一種典型的使用方法如下:

1 set_current_state(TASK_INTERRUPTTABLE);
2 schedule_timeout(delay);

 

核心提供了函式進行短時延時函式:
1 #include <linux/delay.h>
2 void ndelay(unsigned long nsecs);
3 void udelay(unsigned long usecs);
4 void mdelay(unsigned long msecs);

具體實現在asm/delay.h,體系結構相關。三個延時函式都是忙等待。

不是忙等待的函式如下:
1 void msleep(unsigned int millisecs);
2 unsigned long msleep_interruptible(unsigned int millisecs);
3 void ssleep(unsigned int seconds);

不帶有interruptible的函式不可中斷,一定能夠睡眠足夠的時間。

  Kernel Timers
核心計時器可以在不阻塞當前程序的情況下在某個時間點(時鐘週期)執行特定的引數。核心計數器定義在linux/timer.h,實現在kernel/timer.c中。 被排程執行的函式和其註冊函式通常是非同步執行的。許多操作需要程序的上下文才能執行,處於程序上下文之外時(例如在中斷上下文中),必須遵守以下規則:
  1. 不能訪問使用者空間——沒有程序上下文就沒有訪問使用者空間的路徑
  2. current指標在原子模式下沒有意義——current和被中斷的程序沒有關係
  3. 不能休眠或者直接間接呼叫排程函式,例如呼叫wait_event,kmalloc,以及訊號量
核心的程式碼可以呼叫in_interrupt()函式判斷當前是否執行在中斷上下文中,是的話返回0。in_atomic()可用來判斷是否能夠進行排程——可能是硬體或者軟體中斷上下文中,也可能持有自選鎖。後者current指標可能有效,但不能訪問使用者空間——可能導致排程。兩個函式都定義在asm/hardirq.h。 核心計數器的一個特點:一個程序可以重新註冊自己在之後的時間再次執行。這是因為timer可以重新註冊為活躍狀態(timer_list)。在SMP系統上,計時函式和註冊其的函式執行在相同的CPU,以此實現更好地cache區域性性。因此,重新註冊後執行在相同的CPU。還有,即使在單處理器系統上,計時器也會引入競爭——因為他們總是和其他程式碼非同步執行。因此,計時函式並行訪問的所有資料結構都必須通過自選鎖或原子型別進行保護。 核心提供的操作計時器的程式碼:
 1 #include <linux/timer.h>
 2 struct timer_list {
 3     /* ... */
 4     unsigned long expires;
 5     void (*function)(unsigned long);
 6     unsigned long data;
 7 };
 8 void init_timer(struct timer_list *timer);
 9 struct timer_list TIMER_INITIALIZER(_function, _expires, _data);
10 void add_timer(struct timer_list * timer);
11 int del_timer(struct timer_list * timer);

計時器在expires指明的jiffies後以data作為引數執行function,如果要傳入多個引數,可以將其轉化為指標。

核心提供的其他介面:
1 int mod_timer(struct timer_list *timer, unsigned long expires);
2 int del_timer_sync(struct timer_list *timer);
3 int timer_pending(const struct timer_list * timer);

mod_timer可以修改活躍的或者非活躍的計時器。del_timer_sync保證在返回時,計時器不會在任何CPU上執行;這個函式還可以避免SMP系統的競爭條件,和UP核心上的del_timer相同;持有鎖的時侯呼叫del_timer_sync函式要十分小心,如果計時函式也試圖獲得相同的鎖,系統可能會死鎖。timer_pending用來判斷計時器當前受否被排程來執行。

  核心實現計時器時遵循以下要求和假設:
  1. 計時器的管理必須儘可能輕量
  2. 活躍的計時器增加時design should scale well
  3. 絕大多數計時器會在幾秒或至多幾分鐘內失效,更長時間的計時器十分罕見
  4. 計時器執行在註冊其上的CPU
計時器實現在一個per-CPU資料結構上,timer_list資料結構包含一個指向該資料結構的指標base。如果base為空,表明這個計時器沒有排程來執行;否則這個指標表明執行這個計時器的資料結構。 核心程式碼註冊一個計時器時(mod_timer、add_timer),最終會呼叫internal_add_timer(實現在kernel/timer.c)將新的計時器加入到一個和當前CPU相關的級聯表(cascading table)內的雙向連結串列中。 級聯表工作機制如下:如果計時器在0到255個jiffies內失效,將其加入到根據expires的低8位確定的256個表中;類似的還有4個表(9-14,15-20,21-26,27-31)。如果計時器超時的時間更長,將其對映到0xffffffff。 呼叫__run_timers時,會執行所有當前計時刻度的所有待定的計時器。如果當前的jiffies是256的整數倍,將expires值更大的表重新對映。這種實現方法管理每個活躍的計時器時所需的時間和已經註冊的計時器的數量無關,只是需要花費4KB的記憶體空間來儲存512個(256+64*4書中之前的說法27-31只有32項,和此處相矛盾,綜合來看應該以此處為準0xffffffff)表頭。 __run_timers執行在原子上下文,有一個特點:即使在可搶佔核心CPU在核心空間很忙,計時器也能在正確的時間失效。需要注意的是,雖然計時器能夠勝任一些簡單的工作,但是對於工業環境中的生產系統,最好還是使用實時系統的核心。   Tasklets
任務集通常用來管理中斷。任務集和其排程程式執行在相同的CPU,接收一個unsinged long型的引數;但是無法指定執行程式碼的具體時間——核心會選擇執行的時間。和核心計時器類似,兩者都是執行在軟中斷上下文中。 核心提供的建立任務集的介面:
 1 #include <linux/interrupt.h>
 2 struct tasklet_struct {
 3     /* ... */
 4     void (*func)(unsigned long);
 5     unsigned long data;
 6 };
 7 void tasklet_init(struct tasklet_struct *t,
 8 void (*func)(unsigned long), unsigned long data);
 9 DECLARE_TASKLET(name, func, data);
10 DECLARE_TASKLET_DISABLED(name, func, data);

任務集具有以下特點:

  1. 可以啟用或者關閉,但是隻有啟用的次數和關閉的次數一樣多時才會執行
  2. 可以重新註冊自己
  3. 可以以高優先順序或者普通優先順序排程執行。前者通常先執行
  4. 在負載較低的系統上任務集可能會立刻執行,但不會晚於下一個計時器刻度
  5. 多個任務集可以併發,但是一個任務集只會執行在註冊其的CPU上
核心提供的其他介面如下:
1 void tasklet_disable(struct tasklet_struct *t);
2 void tasklet_disable_nosync(struct tasklet_struct *t);
3 void tasklet_enable(struct tasklet_struct *t);
4 void tasklet_schedule(struct tasklet_struct *t);
5 void tasklet_hi_schedule(struct tasklet_struct *t);
6 void tasklet_kill(struct tasklet_struct *t);

tasklet_disable關閉給定的任務集,仍然能夠通過tasklet_schedule排程,但是直到開啟之後才會執行。如果任務集當前正在執行,這個函式會忙等待直到任務集退出;因此,呼叫此函式之後,可以保證此任務集不會在系統中執行。有nosync的函式不會等正在執行的任務集退出才返回;因此此函式返回時任務集可能還在執行。

tasklet_enable開啟給定的任務集,如果任務集已經被排程,會立刻執行。此函式呼叫的次數必須和關閉函式的呼叫次數相匹配——核心會記錄每個任務集的關閉次數。

tasklet_schedule排程給定的任務集來執行,如果一個任務集在執行之前又被排程,只會執行一次。如果任務集在執行的時候被排程,在執行完成後會再次執行;這樣才能保證和其他事件同時發生的事件得到應有的關注,同樣保證一個任務集能夠重新排程自己。hi的函式可以以高優先順序排程任務集。

tasklet_kill保證給定的任務集不會再被排程,如果任務集被排程執行,此函式會等待其完成。在呼叫此函式前,任務集必須阻止其重新排程自己。

任務集實現在kernel/softirq.c中,兩個任務集列表(正常和高優先順序)都宣告為一個per-CPU資料結構。由於不需要排序,管理任務集的資料機構是一個簡單地連結串列。   Workqueues
工作佇列和任務集表面上類似,但是有以下不同:
  1. 任務集的程式碼執行在軟中斷上下文中,必須是原子的;而工作佇列執行在特殊的核心程序中,因此更零活,可以休眠
  2. 任務集通常執行在註冊其的處理器,工作佇列預設也是如此
  3. 核心程式碼可以要求工作佇列的函式推遲一段明確的時間
  4. 最關鍵的是,由於任務集是原子的,執行速度更快;而工作佇列不需要是原子的,延遲可能更高
工作佇列通過struct workqueue_struct描述,定義在linux/workqueue.h,建立函式如下:
1 struct workqueue_struct *create_workqueue(const char *name);
2 struct workqueue_struct *create_singlethread_workqueue(const char *name);
每個工作佇列都有一個或多個專有的程序(核心執行緒)來執行提交給這個佇列的函式。呼叫create_workqueue會在系統中的每個處理器上建立一個執行緒作為worker,而create_singlethread_workqueue只會建立一個執行緒。 要提交一個任務到工作佇列,首先需要填充一個work_struct結構。下列程式碼可以在編譯時完成:
1 DECLARE_WORK(name, void (*function)(void *), void *data);
name是需要宣告的資料結構的名稱,function是要呼叫的函式,data是要傳遞的引數。若要在執行時建立,可以通過下列程式碼:
1 INIT_WORK(struct work_struct *work, void (*function)(void *), void *data);
2 PREPARE_WORK(struct work_struct *work, void (*function)(void *), void *data);

初次建立時要呼叫INIT_WORK,PREPARE_WORK功能類似,但是不會初始化將work_sturct指向工作佇列的指標。如果要對資料結構進行修改,採用後者。

下列函式可以將work_struct提交給工作佇列:
1 int queue_work(struct workqueue_struct *queue, struct work_struct *work);
2 int queue_delayed_work(struct workqueue_struct *queue,
3                        struct work_struct *work, unsigned long delay);

queue_delayed_work直到delay指明的jiffies後才會執行work。function會在worker執行緒的上下文中執行,因此可以睡眠。f由於worker執行緒位於核心空間,unction無法訪問使用者空間。要取消未完成的工作佇列項:

1 int cancel_delayed_work(struct work_struct *work);
如果工作在執行前已經取消,函式返回非零值;如果返回0,工作可能已經在不同的處理器上執行。如果要保證返回0時工作不再執行,在cancel_delayed_work後呼叫
1 void flush_workqueue(struct workqueue_struct *queue);
若要銷燬工作佇列
1 void destroy_workqueue(struct workqueue_struct *queue);

 

很多情況下,裝置驅動不需要有自己的工作佇列,如果只是偶爾向工作佇列提交任務,可以使用效率更高的核心提供的共享工作佇列。相關的函式
1 int schedule_work(struct work_struct *work);
2 int schedule_delayed_work(struct work_struct *work, unsigned long delay);
3 void flush_scheduled_work(void);