1. 程式人生 > >工作佇列詳解

工作佇列詳解

1.工作佇列的建立

INIT_WORK(&work_demo, work_demo_func);
workqueue_demo = create_singlethread_workqueue("workqueue demo");
queue_work(workqueue_demo,&work_demo);

#define alloc_workqueue(fmt, flags, max_active, args...)		\
	__alloc_workqueue_key((fmt), (flags), (max_active),		\
			      NULL, NULL, ##args)
#define alloc_ordered_workqueue(fmt, flags, args...)			\
	alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_singlethread_workqueue(name)				\
	alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)

這裡面有三個引數需要說明:
WQ_UNBOUND:
這個flag的workqueue說明其work的處理不需要繫結在特定的CPU上執行,workqueue需要關聯一個系統中的unbound worker thread pool。如果系統中能找到匹配的執行緒池(根據workqueue的屬性(attribute)),那麼就選擇一個,如果找不到適合的執行緒池,workqueue就會建立一個worker thread pool來處理work。
WQ_MEM_RECLAIM:
如果該workqueue的work相互之間沒有相關性,則不需要這個flag
這個flag相關的概念是rescuer thread。前面我們描述解決併發問題的時候說到:對於A B C D四個work,當正在處理的B work被阻塞後,worker pool會建立一個新的worker thread來處理其他的work,但是,在memory資源比較緊張的時候,建立worker thread未必能夠成功,這時候,如果B work是依賴C或者D work的執行結果的時候,系統進入dead lock。這種狀態是由於不能建立新的worker thread導致的,如何解決呢?對於每一個標記WQ_MEM_RECLAIM flag的work queue,系統都會建立一個rescuer thread,當發生這種情況的時候,C或者D work會被rescuer thread接手處理,從而解除了dead lock。
如果設定了這個flag,那麼工作佇列在建立的時候會建立一個救助者核心執行緒備用,執行緒的名稱為工作佇列的名字


__WQ_ORDERED:
這個flag說明工作佇列中的work都是順序執行的,不存在併發的情況。

__alloc_workqueue_key
	if (flags & WQ_UNBOUND) {
	   //先分配一個屬性
		wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
	}
	alloc_and_link_pwqs(wq)
	...
	將當前的工作佇列加入到全域性的workqueues連結串列中
	list_add_tail_rcu(&wq->list, &workqueues);
	...

static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
	bool highpri = wq->flags & WQ_HIGHPRI;
	int cpu, ret;

	if (!(wq->flags & WQ_UNBOUND)) {   ---針對常規的工作佇列(per cpu型別)
			.........
	} else if (wq->flags & __WQ_ORDERED) {  ---針對orderd型別,順序執行的工作佇列
		ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
		return ret;
	} else { ---針對UNBOUND型別的工作佇列,能夠節省功耗
	.....................
	}
}

其中ordered_wq_attrs這個全域性的指標變數是在linux啟動的時候就已經完成了初始化的,如下:

BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
attrs->nice = std_nice[i];
attrs->no_numa = true;
ordered_wq_attrs[i] = attrs;

關於屬性結構的定義如下:

struct workqueue_attrs { 
    int            nice;        /* nice level */ 
    cpumask_var_t        cpumask;    /* allowed CPUs */ 
    bool            no_numa;    /* disable NUMA affinity */ 
}; 

nice是一個和thread優先順序相關的屬性,nice越低則優先順序越高。cpumask是該workqueue掛入的work允許在哪些cpu上執行。no_numa是一個和NUMA affinity相關的設定。
OK,言歸正傳

apply_workqueue_attrs_locked
	->apply_wqattrs_prepare
	    ->apply_wqattrs_commit

先看apply_wqattrs_prepare函式:

apply_wqattrs_prepare(struct workqueue_struct *wq,
		      const struct workqueue_attrs *attrs)
{
	struct apply_wqattrs_ctx *ctx;
	struct workqueue_attrs *new_attrs, *tmp_attrs;
	int node;
    
	ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
		      GFP_KERNEL);
		      
	//首先分配2個臨時的屬性指標
	new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
	tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
	//拷貝使用者設定的屬性到new_attrs
	copy_workqueue_attrs(new_attrs, attrs);
	/*
		1. 對於4核的CPU,wq_unbound_cpumask代表4個核;
		2. 如果使用者沒有設定這個屬性,那麼最後相與的結果就是new_attrs->cpumask為空
	*/
	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
	if (unlikely(cpumask_empty(new_attrs->cpumask)))
		cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
	//如果使用者沒有配置cpumask的屬性那麼,使用預設屬性(當前晶片CPU的個數)
	
	//拷貝當前的cpumask屬性到臨時屬性變數中
	copy_workqueue_attrs(tmp_attrs, new_attrs);

	/*
		1.建立預設的pwq,針對unbound的情況下是先通過屬性在unbound_pool_hash中尋找是否存在屬性相同的執行緒池,如果存在則直接取出執行緒池pool,再申請pwq的結構體,建立pwq,wq,pool之間的關係,最後返回pwq指標給ctx->dfl_pwq.
		2.如果該屬性在unbound_pool_hash中找不到與之對應屬性的執行緒池,那麼就通過init_worker_pool建立一個執行緒池,將當前的unbound的屬性拷貝到pool->attrs中,再通過create_worker建立該執行緒池的第一個worker執行緒,同時將該執行緒池加入到全域性的unbound_pool_hash表中。然後返回這個新建立的執行緒池pool,隨後申請pwq的結構體,建議pwq,wq,pool之間的關係,最後返回pwq指標給ctx->dfl_pwq.
	*/
	ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
	if (!ctx->dfl_pwq)
		goto out_free;
    //針對numa的情況,每個node都會對應一個pwq,這裡需要特別注意的是numa是需要重新計算cpumask的,因此才會在最初申請了2個屬性指標。
	for_each_node(node) {
		if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
			ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
			if (!ctx->pwq_tbl[node])
				goto out_free;
		} else {
			//特別關注這個地方,如果是非numa的情況下,或者order的workqueue,那麼所有node的pwq都會指向dfl_pwq.
			ctx->dfl_pwq->refcnt++;
			ctx->pwq_tbl[node] = ctx->dfl_pwq;
		}
	}

	/* 檢查使用者配置的屬性,檢查合法性,最後返回ctx指標 */
	copy_workqueue_attrs(new_attrs, attrs);
	cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
	ctx->attrs = new_attrs;

	ctx->wq = wq;
	return ctx;

}

再來看apply_wqattrs_commit

static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
	int node;
	//拷貝使用者配置的屬性到ctx->wq->unbound_attrs
	copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);

	/* save the previous pwq and install the new one */
	for_each_node(node)
		ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
							  ctx->pwq_tbl[node]);

	/* @dfl_pwq might not have been used, ensure it's linked */
	link_pwq(ctx->dfl_pwq);
	swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);

}

上面這個函式簡單來說就是呼叫link_pwq,將pool_workqueue掛入它所屬的workqueue的連結串列中

link_pwq
	list_add_rcu(&pwq->pwqs_node, &wq->pwqs);

2. 工作佇列的排程

INIT_WORK(&work_demo, work_demo_func);
queue_work(workqueue_demo,&work_demo);

include\linux\Workqueue.h

queue_work
	queue_work_on(WORK_CPU_UNBOUND, wq, work);

bool queue_work_on(int cpu, struct workqueue_struct *wq,
		   struct work_struct *work)
{
	//如果當前的工作已經在workqueue中排隊了(但是沒有進入執行),那麼再次queue的work將會被丟棄掉,但是需要特別留意的是,只要這個work進入執行狀態後(即便沒有執行完),相同的work就可以再次進入排隊狀態了。
	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
		__queue_work(cpu, wq, work);
		ret = true;
	}
	return ret;
}

繼續

static void __queue_work(int cpu, struct workqueue_struct *wq,
			 struct work_struct *work)
{
	struct pool_workqueue *pwq;
	struct worker_pool *last_pool;
	struct list_head *worklist;
	unsigned int work_flags;
	unsigned int req_cpu = cpu;


	//如果當前的工作佇列已經處於銷燬狀態了,那麼就不要再調動了
	if (unlikely(wq->flags & __WQ_DRAINING) &&
	    WARN_ON_ONCE(!is_chained_work(wq)))
		return;
retry:
   //預設通過queue_work排程的work都是不指定cpu的,系統獲取當前的cpu
	if (req_cpu == WORK_CPU_UNBOUND)
		cpu = raw_smp_processor_id();

	/*
		這裡有2種情況:
		1.對於percpu型別,通過cpu獲取繫結在該cpu的pwq,從而獲取到對應的執行緒池;
		2.對於unbound型別,需要先通過cpu獲取到node,再通過node獲取到該node下的pwq,從而得到執行緒池;
	*/
	if (!(wq->flags & WQ_UNBOUND))
		pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
	else
		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

	/*獲取上一次執行這個work的執行緒池*/
	last_pool = get_work_pool(work);
	
	/*如果這個執行緒池存在,且該執行緒池和當前cpu或者node對應的執行緒池不是同一個,同時這個work還在執行中,那麼新加入的work還是應該在原來的執行緒池中排隊*/
	if (last_pool && last_pool != pwq->pool) {
		struct worker *worker;

		spin_lock(&last_pool->lock);

		worker = find_worker_executing_work(last_pool, work);

		if (worker && worker->current_pwq->wq == wq) {
			pwq = worker->current_pwq;
		} else {
			/* meh... not running there, queue here */
			spin_unlock(&last_pool->lock);
			spin_lock(&pwq->pool->lock);
		}
	} else {
		spin_lock(&pwq->pool->lock);
	}

	...
   //找到執行緒池對應的worklist
	if (likely(pwq->nr_active < pwq->max_active)) {
		trace_workqueue_activate_work(work);
		pwq->nr_active++;
		worklist = &pwq->pool->worklist;
	} else {
		work_flags |= WORK_STRUCT_DELAYED;
		worklist = &pwq->delayed_works;
	}

	//將當前的work插入到worklist,等待被排程
	insert_work(pwq, work, worklist, work_flags);

	spin_unlock(&pwq->pool->lock);
}

再看看insert_work的實現

static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
			struct list_head *head, unsigned int extra_flags)
{
	struct worker_pool *pool = pwq->pool;

	/*設定work的flags*/
	set_work_pwq(work, pwq, extra_flags);
	/*真正將work插入到worklist中*/
	list_add_tail(&work->entry, head);
	/*增加pwq的引用計數*/
	get_pwq(pwq);
	
	smp_mb();
	/* 
		確認當前的執行緒池是否還處於活躍狀態:
		1. 如果當前的執行緒池已經沒有在運行了,那麼首先需要找到執行緒池中處於idle的執行緒,隨後喚醒這個task。
		2. 如果當前的執行緒池依然在執行,那麼很簡單,找到active的執行緒,執行即可。
	*/
	if (__need_more_worker(pool))
		wake_up_worker(pool);
}