1. 程式人生 > >如何理解create_singlethread_workqueue是嚴格按照順序執行的

如何理解create_singlethread_workqueue是嚴格按照順序執行的

我們知道工作佇列有三種,分別是PerCpu, Unbound,以及ORDERED這三種類型,正如之前的文件分析:
1.PerCpu的工作佇列:
API:

create_workqueue(name)

這種工作佇列在queue_work的時候,首先檢查當前的Cpu是哪一個,然後將work排程到該cpu下面的normal級別的執行緒池中執行。
注:針對PerCpu型別而言,系統在開機的時候會註冊2個執行緒池,一個低優先順序的,一個高優先順序的。
2.Unbound的工作佇列:
API:

create_freezable_workqueue(name)

這種工作佇列在queue_work的時候,同樣首先檢查當前的Cpu是哪一個,隨後需要計算當前的Cpu屬於哪一個Node,因為對於Unbound的工作佇列而言,執行緒池並不是繫結到cpu的而是繫結到Node的,隨後找到該Node對應的執行緒池中執行。需要留意的是這種工作佇列是考慮了功耗的,例如:當work排程的時候,排程器會盡量的讓已經休眠的cpu保持休眠,而將當前的work排程到其他active的cpu上去執行。

注:對於NUMA沒有使能的情況下,所有節點的執行緒池都會指向dfl的執行緒池。
3.Ordered的工作佇列:
API:

create_singlethread_workqueue(name) 或者 alloc_ordered_workqueue(fmt,
flags, args…)

這種work也是Unbound中的一種,但是這種工作佇列即便是在NUMA使能的情況下,所有Node的執行緒池都會被指向dfl的執行緒池,換句話說Ordered的工作佇列只有一個執行緒池,因為只有這樣才能保證Ordered的工作佇列是順序執行的,而這也是本文分析的切入點。


有關併發問題的總結性陳述:

   首先對於Ordered的工作佇列(create_singlethread_workqueue,其他自定義的API則不一定了)這是嚴格順序執行的,絕對不可能出現併發(無論提交給wq的是否是同一個work)。
   但是對於PerCpu的工作佇列(create_workqueue),其中對於提交給wq的如果是同一個work,那麼也不會併發,會順序執行。但是如果提交給wq的不是同一個work,則會在不同的cpu間併發。需要特別留意的是,其並不會在同一個CPU的不同執行緒間併發,這是因為create_workqueue這個API定義的max_active為1,也就意味者,當前wq只能最多在每個cpu上併發1個執行緒。

#define alloc_ordered_workqueue(fmt, flags, args...)			\
	alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args)
#define create_singlethread_workqueue(name)				\
	alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)

    這裡面特別要去留意的是alloc_workqueue的第二個引數是flags,第三個引數表示當前工作佇列max_active的work個數,比如當前值為1,那麼在當前工作佇列中如果已經有work在執行中了,隨後排隊的work只能進入pwq->delayed_works的延遲佇列中,等到當前的work執行完畢後再順序執行。

    那麼這兒有個疑問就是如果將active增大,是否意味著佇列中的work可以並行執行了呢,也不全是,如果當前排隊的work和正在執行的work是同一個的話則需要等待當前work執行完成後順序執行。如果當前排隊的work和正在執行的work不是同一個同時alloc_workqueue函式的第三個引數(max_active)大於1的話,那麼核心會為你線上程池中開啟一個新的執行緒來執行這個work。

OK,Read The Fuck Source.

kernel\Workqueue.c

static void __queue_work(int cpu, struct workqueue_struct *wq,
			 struct work_struct *work)
{
 	.....
 	/*
 	    1. 當第一次排程的時候,由於pwq->nr_active為0,低於max_active(1),則將work加入到執行緒池中的worklist中,pwq->nr_active自增.
 	    2. 當第二次排程的時候,且第一次排程的work正在執行中(進入function了),由於pwq->nr_active為1,不低於max_active(1),則將work加入到執行緒池的delayed_works延遲列表中,並設定當前work的flag為WORK_STRUCT_DELAYED.
 	*/
 	if (likely(pwq->nr_active < pwq->max_active)) {
		trace_workqueue_activate_work(work);
		pwq->nr_active++;
		worklist = &pwq->pool->worklist;
	} else {
		work_flags |= WORK_STRUCT_DELAYED;
		worklist = &pwq->delayed_works;
	}
	//如上面的描述插入到對應的連結串列中	
	insert_work(pwq, work, worklist, work_flags);
	....
}

static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
			struct list_head *head, unsigned int extra_flags)
{
	struct worker_pool *pool = pwq->pool;	
	//設定work的flag
	set_work_pwq(work, pwq, extra_flags);
	//將work加入到對應的執行緒池的worklist或者delayed_works連結串列中
	list_add_tail(&work->entry, head);
	...
	//從執行緒池中取出處於idle的執行緒,喚醒它
	if (__need_more_worker(pool))
			wake_up_worker(pool);
}

我們繼續看看喚醒的執行緒中是怎麼處理的,是使用這個喚醒的idle執行緒呢?還是在原有的執行緒處理結束後再執行?

static int worker_thread(void *__worker)
{
	...
woke_up:
		/*
			如下所示
			1.針對第一次排程的情況,pool的worklist不為NULL,且pool->nr_running為0(意味著所有的worker都進入了阻塞狀態),則當前喚醒的執行緒將繼續處理這個work。
			2.針對第二次排程的情況,且第一次排程的work正在執行中(進入function了),那麼由於pool的worklist為NULL(該work進入了延遲佇列),那麼,當前喚醒的worker會直接睡眠。
		*/
		if (!need_more_worker(pool))
		goto sleep;
		if (unlikely(!may_start_working(pool)) && manage_workers(worker))
		goto recheck;
		...
		do {
			...
			process_one_work(worker, work);
			...
		} while (keep_working(pool));
		....
sleep:
		worker_enter_idle(worker);
		__set_current_state(TASK_INTERRUPTIBLE);
		spin_unlock_irq(&pool->lock);
		schedule();
		goto woke_up;		
}

我們再看看process_one_work的執行過程

static void process_one_work(struct worker *worker, struct work_struct *work)
{
	...
		//這裡的理解也是非常的重要的
		/*
		  首先線上程池中正在執行的執行緒中取出正在執行的work和當前想要處理的work進行比對,如果是同一個work那麼直接返回,等待原先的那個work處理結束後再緊接著處理。
		*/
		collision = find_worker_executing_work(pool, work);
		if (unlikely(collision)) {
			move_linked_works(work, &collision->scheduled, NULL);
			return;
		}
	...
		//真正處理這個work的地方
		worker->current_func(work);
	...
		//判斷是否要處理延遲佇列的work
		pwq_dec_nr_in_flight(pwq, work_color);
	...
}

看看延遲佇列是怎麼提取出來的

static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
{    
	...
	//當目前的work處理完成後,就可以將當前工作佇列(ordered型別)active的work減1到0了,也就是說當前工作佇列(ordered型別)又可以接收新的work了
	pwq->nr_active--;
	//如果之前的延遲佇列有待處理的work,那麼取出來加到pool->worklist,等到執行緒的下一次while迴圈的時候執行。
	/*
	   流程如下:
	   pwq_activate_first_delayed->pwq_activate_delayed_work->move_linked_works
	*/
	if (!list_empty(&pwq->delayed_works)) {
		/* one down, submit a delayed one */
		if (pwq->nr_active < pwq->max_active)
			pwq_activate_first_delayed(pwq);
	}
	...
}

static void pwq_activate_delayed_work(struct work_struct *work)
{
	struct pool_workqueue *pwq = get_work_pwq(work);

	trace_workqueue_activate_work(work);
	move_linked_works(work, &pwq->pool->worklist, NULL);
	__clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
	pwq->nr_active++;
}

最後說明一下2個問題:

  1. pool->nr_running,這個flag表示當前執行緒池是否是阻塞或者Active狀態。
    0: 阻塞狀態,work的function中有可能呼叫了導致sleep的函式,例如msleep,wait_interrupt,mutex等。這種情況下如果再次insert_work的話,需要在當前執行緒池中,開啟新的執行緒(這個執行緒有可能是在當前CPU的不同執行緒或者是不同的CPU上)去處理。
    1:Active狀態,work的function還在執行中,且沒有導致sleep的操作。這種情況下如果再次insert_work的話,不需要再開啟新的執行緒了,直接在原有執行緒中處理即可。
  2. 第二個問題是針對unbound的工作佇列,其執行緒池是否需要額外建立的原則是屬性是否一致,屬性匹配只關注2個地方,一個是優先順序,一個是cpumask(當前工作是否可以在對應的cpu上執行)。