如何理解create_singlethread_workqueue是嚴格按照順序執行的
我們知道工作佇列有三種,分別是PerCpu, Unbound,以及ORDERED這三種類型,正如之前的文件分析:
1.PerCpu的工作佇列:
API:
create_workqueue(name)
這種工作佇列在queue_work的時候,首先檢查當前的Cpu是哪一個,然後將work排程到該cpu下面的normal級別的執行緒池中執行。
注:針對PerCpu型別而言,系統在開機的時候會註冊2個執行緒池,一個低優先順序的,一個高優先順序的。
2.Unbound的工作佇列:
API:
create_freezable_workqueue(name)
這種工作佇列在queue_work的時候,同樣首先檢查當前的Cpu是哪一個,隨後需要計算當前的Cpu屬於哪一個Node,因為對於Unbound的工作佇列而言,執行緒池並不是繫結到cpu的而是繫結到Node的,隨後找到該Node對應的執行緒池中執行。需要留意的是這種工作佇列是考慮了功耗的,例如:當work排程的時候,排程器會盡量的讓已經休眠的cpu保持休眠,而將當前的work排程到其他active的cpu上去執行。
注:對於NUMA沒有使能的情況下,所有節點的執行緒池都會指向dfl的執行緒池。
3.Ordered的工作佇列:
API:
create_singlethread_workqueue(name) 或者 alloc_ordered_workqueue(fmt,
flags, args…)
這種work也是Unbound中的一種,但是這種工作佇列即便是在NUMA使能的情況下,所有Node的執行緒池都會被指向dfl的執行緒池,換句話說Ordered的工作佇列只有一個執行緒池,因為只有這樣才能保證Ordered的工作佇列是順序執行的,而這也是本文分析的切入點。
有關併發問題的總結性陳述:
首先對於Ordered的工作佇列(create_singlethread_workqueue,其他自定義的API則不一定了)這是嚴格順序執行的,絕對不可能出現併發(無論提交給wq的是否是同一個work)。但是對於PerCpu的工作佇列(create_workqueue),其中對於提交給wq的如果是同一個work,那麼也不會併發,會順序執行。但是如果提交給wq的不是同一個work,則會在不同的cpu間併發。需要特別留意的是,其並不會在同一個CPU的不同執行緒間併發,這是因為create_workqueue這個API定義的max_active為1,也就意味者,當前wq只能最多在每個cpu上併發1個執行緒。
#define alloc_ordered_workqueue(fmt, flags, args...) \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_singlethread_workqueue(name) \
alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
這裡面特別要去留意的是alloc_workqueue的第二個引數是flags,第三個引數表示當前工作佇列max_active的work個數,比如當前值為1,那麼在當前工作佇列中如果已經有work在執行中了,隨後排隊的work只能進入pwq->delayed_works的延遲佇列中,等到當前的work執行完畢後再順序執行。
那麼這兒有個疑問就是如果將active增大,是否意味著佇列中的work可以並行執行了呢,也不全是,如果當前排隊的work和正在執行的work是同一個的話則需要等待當前work執行完成後順序執行。如果當前排隊的work和正在執行的work不是同一個同時alloc_workqueue函式的第三個引數(max_active)大於1的話,那麼核心會為你線上程池中開啟一個新的執行緒來執行這個work。
OK,Read The Fuck Source.
kernel\Workqueue.c
static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
.....
/*
1. 當第一次排程的時候,由於pwq->nr_active為0,低於max_active(1),則將work加入到執行緒池中的worklist中,pwq->nr_active自增.
2. 當第二次排程的時候,且第一次排程的work正在執行中(進入function了),由於pwq->nr_active為1,不低於max_active(1),則將work加入到執行緒池的delayed_works延遲列表中,並設定當前work的flag為WORK_STRUCT_DELAYED.
*/
if (likely(pwq->nr_active < pwq->max_active)) {
trace_workqueue_activate_work(work);
pwq->nr_active++;
worklist = &pwq->pool->worklist;
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}
//如上面的描述插入到對應的連結串列中
insert_work(pwq, work, worklist, work_flags);
....
}
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
struct list_head *head, unsigned int extra_flags)
{
struct worker_pool *pool = pwq->pool;
//設定work的flag
set_work_pwq(work, pwq, extra_flags);
//將work加入到對應的執行緒池的worklist或者delayed_works連結串列中
list_add_tail(&work->entry, head);
...
//從執行緒池中取出處於idle的執行緒,喚醒它
if (__need_more_worker(pool))
wake_up_worker(pool);
}
我們繼續看看喚醒的執行緒中是怎麼處理的,是使用這個喚醒的idle執行緒呢?還是在原有的執行緒處理結束後再執行?
static int worker_thread(void *__worker)
{
...
woke_up:
/*
如下所示
1.針對第一次排程的情況,pool的worklist不為NULL,且pool->nr_running為0(意味著所有的worker都進入了阻塞狀態),則當前喚醒的執行緒將繼續處理這個work。
2.針對第二次排程的情況,且第一次排程的work正在執行中(進入function了),那麼由於pool的worklist為NULL(該work進入了延遲佇列),那麼,當前喚醒的worker會直接睡眠。
*/
if (!need_more_worker(pool))
goto sleep;
if (unlikely(!may_start_working(pool)) && manage_workers(worker))
goto recheck;
...
do {
...
process_one_work(worker, work);
...
} while (keep_working(pool));
....
sleep:
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irq(&pool->lock);
schedule();
goto woke_up;
}
我們再看看process_one_work的執行過程
static void process_one_work(struct worker *worker, struct work_struct *work)
{
...
//這裡的理解也是非常的重要的
/*
首先線上程池中正在執行的執行緒中取出正在執行的work和當前想要處理的work進行比對,如果是同一個work那麼直接返回,等待原先的那個work處理結束後再緊接著處理。
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}
...
//真正處理這個work的地方
worker->current_func(work);
...
//判斷是否要處理延遲佇列的work
pwq_dec_nr_in_flight(pwq, work_color);
...
}
看看延遲佇列是怎麼提取出來的
static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
{
...
//當目前的work處理完成後,就可以將當前工作佇列(ordered型別)active的work減1到0了,也就是說當前工作佇列(ordered型別)又可以接收新的work了
pwq->nr_active--;
//如果之前的延遲佇列有待處理的work,那麼取出來加到pool->worklist,等到執行緒的下一次while迴圈的時候執行。
/*
流程如下:
pwq_activate_first_delayed->pwq_activate_delayed_work->move_linked_works
*/
if (!list_empty(&pwq->delayed_works)) {
/* one down, submit a delayed one */
if (pwq->nr_active < pwq->max_active)
pwq_activate_first_delayed(pwq);
}
...
}
static void pwq_activate_delayed_work(struct work_struct *work)
{
struct pool_workqueue *pwq = get_work_pwq(work);
trace_workqueue_activate_work(work);
move_linked_works(work, &pwq->pool->worklist, NULL);
__clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
pwq->nr_active++;
}
最後說明一下2個問題:
- pool->nr_running,這個flag表示當前執行緒池是否是阻塞或者Active狀態。
0: 阻塞狀態,work的function中有可能呼叫了導致sleep的函式,例如msleep,wait_interrupt,mutex等。這種情況下如果再次insert_work的話,需要在當前執行緒池中,開啟新的執行緒(這個執行緒有可能是在當前CPU的不同執行緒或者是不同的CPU上)去處理。
1:Active狀態,work的function還在執行中,且沒有導致sleep的操作。這種情況下如果再次insert_work的話,不需要再開啟新的執行緒了,直接在原有執行緒中處理即可。 - 第二個問題是針對unbound的工作佇列,其執行緒池是否需要額外建立的原則是屬性是否一致,屬性匹配只關注2個地方,一個是優先順序,一個是cpumask(當前工作是否可以在對應的cpu上執行)。