工作佇列詳解
1.工作佇列的建立
INIT_WORK(&work_demo, work_demo_func);
workqueue_demo = create_singlethread_workqueue("workqueue demo");
queue_work(workqueue_demo,&work_demo);
#define alloc_workqueue(fmt, flags, max_active, args...) \ __alloc_workqueue_key((fmt), (flags), (max_active), \ NULL, NULL, ##args) #define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args) #define create_singlethread_workqueue(name) \ alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
這裡面有三個引數需要說明:
WQ_UNBOUND:
這個flag的workqueue說明其work的處理不需要繫結在特定的CPU上執行,workqueue需要關聯一個系統中的unbound worker thread pool。如果系統中能找到匹配的執行緒池(根據workqueue的屬性(attribute)),那麼就選擇一個,如果找不到適合的執行緒池,workqueue就會建立一個worker thread pool來處理work。
WQ_MEM_RECLAIM:
如果該workqueue的work相互之間沒有相關性,則不需要這個flag
這個flag相關的概念是rescuer thread。前面我們描述解決併發問題的時候說到:對於A B C D四個work,當正在處理的B work被阻塞後,worker pool會建立一個新的worker thread來處理其他的work,但是,在memory資源比較緊張的時候,建立worker thread未必能夠成功,這時候,如果B work是依賴C或者D work的執行結果的時候,系統進入dead lock。這種狀態是由於不能建立新的worker thread導致的,如何解決呢?對於每一個標記WQ_MEM_RECLAIM flag的work queue,系統都會建立一個rescuer thread,當發生這種情況的時候,C或者D work會被rescuer thread接手處理,從而解除了dead lock。
如果設定了這個flag,那麼工作佇列在建立的時候會建立一個救助者核心執行緒備用,執行緒的名稱為工作佇列的名字
__WQ_ORDERED:
這個flag說明工作佇列中的work都是順序執行的,不存在併發的情況。
__alloc_workqueue_key if (flags & WQ_UNBOUND) { //先分配一個屬性 wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); } alloc_and_link_pwqs(wq) ... 將當前的工作佇列加入到全域性的workqueues連結串列中 list_add_tail_rcu(&wq->list, &workqueues); ...
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret;
if (!(wq->flags & WQ_UNBOUND)) { ---針對常規的工作佇列(per cpu型別)
.........
} else if (wq->flags & __WQ_ORDERED) { ---針對orderd型別,順序執行的工作佇列
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
return ret;
} else { ---針對UNBOUND型別的工作佇列,能夠節省功耗
.....................
}
}
其中ordered_wq_attrs這個全域性的指標變數是在linux啟動的時候就已經完成了初始化的,如下:
BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
attrs->nice = std_nice[i];
attrs->no_numa = true;
ordered_wq_attrs[i] = attrs;
關於屬性結構的定義如下:
struct workqueue_attrs {
int nice; /* nice level */
cpumask_var_t cpumask; /* allowed CPUs */
bool no_numa; /* disable NUMA affinity */
};
nice是一個和thread優先順序相關的屬性,nice越低則優先順序越高。cpumask是該workqueue掛入的work允許在哪些cpu上執行。no_numa是一個和NUMA affinity相關的設定。
OK,言歸正傳
apply_workqueue_attrs_locked
->apply_wqattrs_prepare
->apply_wqattrs_commit
先看apply_wqattrs_prepare函式:
apply_wqattrs_prepare(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
int node;
ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
GFP_KERNEL);
//首先分配2個臨時的屬性指標
new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
//拷貝使用者設定的屬性到new_attrs
copy_workqueue_attrs(new_attrs, attrs);
/*
1. 對於4核的CPU,wq_unbound_cpumask代表4個核;
2. 如果使用者沒有設定這個屬性,那麼最後相與的結果就是new_attrs->cpumask為空
*/
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
if (unlikely(cpumask_empty(new_attrs->cpumask)))
cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
//如果使用者沒有配置cpumask的屬性那麼,使用預設屬性(當前晶片CPU的個數)
//拷貝當前的cpumask屬性到臨時屬性變數中
copy_workqueue_attrs(tmp_attrs, new_attrs);
/*
1.建立預設的pwq,針對unbound的情況下是先通過屬性在unbound_pool_hash中尋找是否存在屬性相同的執行緒池,如果存在則直接取出執行緒池pool,再申請pwq的結構體,建立pwq,wq,pool之間的關係,最後返回pwq指標給ctx->dfl_pwq.
2.如果該屬性在unbound_pool_hash中找不到與之對應屬性的執行緒池,那麼就通過init_worker_pool建立一個執行緒池,將當前的unbound的屬性拷貝到pool->attrs中,再通過create_worker建立該執行緒池的第一個worker執行緒,同時將該執行緒池加入到全域性的unbound_pool_hash表中。然後返回這個新建立的執行緒池pool,隨後申請pwq的結構體,建議pwq,wq,pool之間的關係,最後返回pwq指標給ctx->dfl_pwq.
*/
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
if (!ctx->dfl_pwq)
goto out_free;
//針對numa的情況,每個node都會對應一個pwq,這裡需要特別注意的是numa是需要重新計算cpumask的,因此才會在最初申請了2個屬性指標。
for_each_node(node) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[node])
goto out_free;
} else {
//特別關注這個地方,如果是非numa的情況下,或者order的workqueue,那麼所有node的pwq都會指向dfl_pwq.
ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[node] = ctx->dfl_pwq;
}
}
/* 檢查使用者配置的屬性,檢查合法性,最後返回ctx指標 */
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;
ctx->wq = wq;
return ctx;
}
再來看apply_wqattrs_commit
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
int node;
//拷貝使用者配置的屬性到ctx->wq->unbound_attrs
copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
/* save the previous pwq and install the new one */
for_each_node(node)
ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
ctx->pwq_tbl[node]);
/* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(ctx->dfl_pwq);
swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
}
上面這個函式簡單來說就是呼叫link_pwq,將pool_workqueue掛入它所屬的workqueue的連結串列中
link_pwq
list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
2. 工作佇列的排程
INIT_WORK(&work_demo, work_demo_func);
queue_work(workqueue_demo,&work_demo);
include\linux\Workqueue.h
queue_work
queue_work_on(WORK_CPU_UNBOUND, wq, work);
bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
//如果當前的工作已經在workqueue中排隊了(但是沒有進入執行),那麼再次queue的work將會被丟棄掉,但是需要特別留意的是,只要這個work進入執行狀態後(即便沒有執行完),相同的work就可以再次進入排隊狀態了。
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work);
ret = true;
}
return ret;
}
繼續
static void __queue_work(int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct pool_workqueue *pwq;
struct worker_pool *last_pool;
struct list_head *worklist;
unsigned int work_flags;
unsigned int req_cpu = cpu;
//如果當前的工作佇列已經處於銷燬狀態了,那麼就不要再調動了
if (unlikely(wq->flags & __WQ_DRAINING) &&
WARN_ON_ONCE(!is_chained_work(wq)))
return;
retry:
//預設通過queue_work排程的work都是不指定cpu的,系統獲取當前的cpu
if (req_cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id();
/*
這裡有2種情況:
1.對於percpu型別,通過cpu獲取繫結在該cpu的pwq,從而獲取到對應的執行緒池;
2.對於unbound型別,需要先通過cpu獲取到node,再通過node獲取到該node下的pwq,從而得到執行緒池;
*/
if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
/*獲取上一次執行這個work的執行緒池*/
last_pool = get_work_pool(work);
/*如果這個執行緒池存在,且該執行緒池和當前cpu或者node對應的執行緒池不是同一個,同時這個work還在執行中,那麼新加入的work還是應該在原來的執行緒池中排隊*/
if (last_pool && last_pool != pwq->pool) {
struct worker *worker;
spin_lock(&last_pool->lock);
worker = find_worker_executing_work(last_pool, work);
if (worker && worker->current_pwq->wq == wq) {
pwq = worker->current_pwq;
} else {
/* meh... not running there, queue here */
spin_unlock(&last_pool->lock);
spin_lock(&pwq->pool->lock);
}
} else {
spin_lock(&pwq->pool->lock);
}
...
//找到執行緒池對應的worklist
if (likely(pwq->nr_active < pwq->max_active)) {
trace_workqueue_activate_work(work);
pwq->nr_active++;
worklist = &pwq->pool->worklist;
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist = &pwq->delayed_works;
}
//將當前的work插入到worklist,等待被排程
insert_work(pwq, work, worklist, work_flags);
spin_unlock(&pwq->pool->lock);
}
再看看insert_work的實現
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
struct list_head *head, unsigned int extra_flags)
{
struct worker_pool *pool = pwq->pool;
/*設定work的flags*/
set_work_pwq(work, pwq, extra_flags);
/*真正將work插入到worklist中*/
list_add_tail(&work->entry, head);
/*增加pwq的引用計數*/
get_pwq(pwq);
smp_mb();
/*
確認當前的執行緒池是否還處於活躍狀態:
1. 如果當前的執行緒池已經沒有在運行了,那麼首先需要找到執行緒池中處於idle的執行緒,隨後喚醒這個task。
2. 如果當前的執行緒池依然在執行,那麼很簡單,找到active的執行緒,執行即可。
*/
if (__need_more_worker(pool))
wake_up_worker(pool);
}