Linux核心實現透視---工作佇列
作為Linux中斷低半部的另一種實現機制的基礎,工作佇列的出現更多的是為了解決軟中斷和Tasklet對於使用者程序的時間片的不良影響問題的。工作佇列本身是可以使用核心執行緒來替代的,但是使用執行緒來實現複雜程度和記憶體資源的消耗是不利因素,所以Linux核心就實現了這一機制。通過核心執行緒(worker)作為工作佇列的執行者,單個工作採用struct work_struct來進行描述,而一系列的工作採用struct workqueue_struct來描述,最後為了更好的管理worker又抽象出了工作執行緒池由struct worker_pool描述,最後再由執行緒池和工作佇列關聯器用struct pool_workqueue來描述來管理工作佇列和執行緒池的關係。接下來逐一瞭解各個資料結構的定義和關係。
工作(work)
struct work_struct { //低位元位部分是work的標誌位,剩餘位元位通常用於存放上一次執行的worker_pool ID或pool_workqueue的指標。存放的內容有WORK_STRUCT_PWQ標誌位來決定 atomic_long_t data; //用於把work掛到其他佇列上。 struct list_head entry; //工作任務的處理函式 work_func_t func; #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif}
工作佇列(workqueue_struct)
struct workqueue_struct { struct list_head pwqs; /* WR: all pwqs of this wq */ struct list_head list; /* PL: list of all workqueues */ struct mutex mutex; /* protects this wq */ int work_color; /* WQ: current work color*/ int flush_color; /* WQ: current flush color */ atomic_t nr_pwqs_to_flush; /* flush in progress */ struct wq_flusher *first_flusher; /* WQ: first flusher */ struct list_head flusher_queue; /* WQ: flush waiters */ struct list_head flusher_overflow; /* WQ: flush overflow list */ //所有rescue狀態下的pool_workqueue資料結構連結串列 struct list_head maydays; /* MD: pwqs requesting rescue */ //rescue核心執行緒,記憶體緊張時建立新的工作執行緒可能會失敗, //如果建立workqueue是設定了WQ_MEM_RECLAIM,那麼rescuer執行緒會接管這種情況。 struct worker *rescuer; /* I: rescue worker */ int nr_drainers; /* WQ: drain in progress */ int saved_max_active; /* WQ: saved pwq max_active */ struct workqueue_attrs *unbound_attrs; /* WQ: only for unbound wqs */ struct pool_workqueue *dfl_pwq; /* WQ: only for unbound wqs */ #ifdef CONFIG_SYSFS struct wq_device *wq_dev; /* I: for sysfs interface */ #endif #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif char name[WQ_NAME_LEN]; /* I: workqueue name */ /* hot fields used during command issue, aligned to cacheline */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */ struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */ };
工作者(struct worker)
struct worker { /* on idle list while idle, on busy hash table while busy */ union { struct list_head entry; /* L: while idle */ struct hlist_node hentry; /* L: while busy */ }; struct work_struct *current_work; /* L: work being processed */ work_func_t current_func; /* L: current_work's fn */ struct pool_workqueue *current_pwq; /* L: current_work's pwq */ bool desc_valid; /* ->desc is valid */ struct list_head scheduled; /* L: scheduled works */ /* 64 bytes boundary on 64bit, 32 on 32bit */ struct task_struct *task; /* I: worker task */ struct worker_pool *pool; /* I: the associated pool */ /* L: for rescuers */ struct list_head node; /* A: anchored at pool->workers */ /* A: runs through worker->node */ unsigned long last_active; /* L: last active timestamp */ unsigned int flags; /* X: flags */ int id; /* I: worker id */ /* * Opaque string set with work_set_desc(). Printed out with task * dump for debugging - WARN, BUG, panic or sysrq. */ char desc[WORKER_DESC_LEN]; /* used only by rescuers to point to the target workqueue */ struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */ };
工作執行緒池(struct worker_pool)
struct worker_pool { spinlock_t lock; /* the pool lock */ int cpu; /* I: the associated cpu */ int node; /* I: the associated node ID */ int id; /* I: pool ID */ unsigned int flags; /* X: flags */ struct list_head worklist; /* L: list of pending works */ int nr_workers; /* L: total number of workers */ /* nr_idle includes the ones off idle_list for rebinding */ int nr_idle; /* L: currently idle ones */ struct list_head idle_list; /* X: list of idle workers */ struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ /* a workers is either on busy_hash or idle_list, or the manager */ DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); /* L: hash of busy workers */ /* see manage_workers() for details on the two manager mutexes */ struct mutex attach_mutex; /* attach/detach exclusion */ struct list_head workers; /* A: attached workers */ struct completion *detach_completion; /* all workers detached */ struct ida worker_ida; /* worker IDs for task name */ struct workqueue_attrs *attrs; /* I: worker attributes */ struct hlist_node hash_node; /* PL: unbound_pool_hash node */ int refcnt; /* PL: refcnt for unbound pools */ /* * The current concurrency level. As it's likely to be accessed * from other CPUs during try_to_wake_up(), put it in a separate * cacheline. */ atomic_t nr_running ____cacheline_aligned_in_smp; /* * Destruction of pool is sched-RCU protected to allow dereferences * from get_work_pool(). */ struct rcu_head rcu; } ____cacheline_aligned_in_smp;
執行緒池和工作佇列關聯(struct pool_workqueue)
struct pool_workqueue { struct worker_pool *pool; /* I: the associated pool */ struct workqueue_struct *wq; /* I: the owning workqueue */ int work_color; /* L: current color */ int flush_color; /* L: flushing color */ int refcnt; /* L: reference count */ int nr_in_flight[WORK_NR_COLORS]; /* L: nr of in_flight works */ int nr_active; /* L: nr of active works */ int max_active; /* L: max active works */ struct list_head delayed_works; /* L: delayed works */ struct list_head pwqs_node; /* WR: node on wq->pwqs */ struct list_head mayday_node; /* MD: node on wq->maydays */ /* * Release of unbound pwq is punted to system_wq. See put_pwq() * and pwq_unbound_release_workfn() for details. pool_workqueue * itself is also sched-RCU protected so that the first pwq can be * determined without grabbing wq->mutex. */ struct work_struct unbound_release_work; struct rcu_head rcu; } __aligned(1 << WORK_STRUCT_FLAG_BITS);
有時間了畫一個簡單的圖來表示他們的關係。
圖片(有時間了來填坑)
前面瞭解了大致的資料結構關係後下來再來看工作佇列的處理過程,因為資料結構的管理都是由核心完成的而驅動開發正真關係的的執行過程的細節。先從核心初始化執行緒和工作佇列開始一步步深入。
工作佇列的初始化
在系統啟動過程中通過init_workqueues()初始化了工作執行緒,並建立的一部分核心工作佇列。建立的規則是每一個CPU核心建立兩個執行緒一個高優先順序(核心最高)一個低優先順序(中間優先)的執行緒這些執行緒和CPU是繫結的,只處理指定CPU上的pool_workqueue。除此之外核心還建立了兩個個和CPU無關的執行緒可以用來處理所有的工作。這裡需要注意的是工作執行緒的建立也是根據啟用線上的CPU的個數建立的而不是總的CPU的個數,所以在CPU啟用的介面中會有回撥介面用於建立核心工作執行緒,對應的在CPU休眠時就有對應的銷燬回撥。來看原始碼:
static int __init init_workqueues(void) { //NR_STD_WORKER_POOLS = 2 對應一個高優先順序-20(最高)和一個低優先順序0(中間) int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL }; int i, cpu; WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); //這裡就是繫結在CPU啟用和休眠的時候建立和銷燬工作執行緒的介面 cpu_notifier(workqueue_cpu_up_callback, CPU_PRI_WORKQUEUE_UP); hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); wq_numa_init(); /* initialize CPU pools */ //遍歷每個CPU for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; //這裡會拿到CPU per 型別的工作執行緒池結構體地址(實際上是陣列而CPU編號為index)到pool for_each_cpu_worker_pool(pool, cpu) { /* 呼叫init_worker_pool 初始化工作執行緒程池,繫結CPU,設定優先順序等 */ BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; pool->node = cpu_to_node(cpu); /* alloc pool ID */ mutex_lock(&wq_pool_mutex); BUG_ON(worker_pool_assign_id(pool)); mutex_unlock(&wq_pool_mutex); } } /* create the initial worker */ //遍歷啟用的cpu for_each_online_cpu(cpu) { struct worker_pool *pool; //同上面一樣遍歷cpu per pool for_each_cpu_worker_pool(pool, cpu) { //建立執行緒線上程池中並啟動工作執行緒、修改執行緒池標誌 pool->flags &= ~POOL_DISASSOCIATED; BUG_ON(create_and_start_worker(pool) < 0); } } //建立不繫結的執行緒屬性並繫結 /* create default unbound and ordered wq attrs */ for (i = 0; i < NR_STD_WORKER_POOLS; i++) { struct workqueue_attrs *attrs; BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; unbound_std_wq_attrs[i] = attrs; /* * An ordered wq should have only one pwq as ordering is * guaranteed by max_active which is enforced by pwqs. * Turn off NUMA so that dfl_pwq is used for all nodes. */ BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); attrs->nice = std_nice[i]; attrs->no_numa = true; ordered_wq_attrs[i] = attrs; } //建立核心工作佇列 system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, WQ_UNBOUND_MAX_ACTIVE); system_freezable_wq = alloc_workqueue("events_freezable", WQ_FREEZABLE, 0); system_power_efficient_wq = alloc_workqueue("events_power_efficient", WQ_POWER_EFFICIENT, 0); system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient", WQ_FREEZABLE | WQ_POWER_EFFICIENT, 0); BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq || !system_unbound_wq || !system_freezable_wq || !system_power_efficient_wq || !system_freezable_power_efficient_wq); return 0; }
具體來看建立和啟動核心工作執行緒的介面create_and_start_worker():
static int create_and_start_worker(struct worker_pool *pool) { struct worker *worker; worker = create_worker(pool); if (worker) { spin_lock_irq(&pool->lock); start_worker(worker); spin_unlock_irq(&pool->lock); } return worker ? 0 : -ENOMEM; } static struct worker *create_worker(struct worker_pool *pool) { struct worker *worker = NULL; int id = -1; char id_buf[16]; /* ID is needed to determine kthread name */ //從對應的pool中取執行緒id這體現了核心新的採用工作執行緒pool來管理工作執行緒的思想 id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL); if (id < 0) goto fail; worker = alloc_worker(); if (!worker) goto fail; //指定工作執行緒池和id worker->pool = pool; worker->id = id; //工作執行緒池中的cpu 指定繫結的cpu如果為-1則是不繫結CPU if (pool->cpu >= 0) snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id, pool->attrs->nice < 0 ? "H" : ""); else snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id); //關鍵點:建立工作執行緒 執行緒函式為worker_thread ,工作執行緒還被掛接到pool上 worker->task = kthread_create_on_node(worker_thread, worker, pool->node, "kworker/%s", id_buf); if (IS_ERR(worker->task)) goto fail; //由此可見執行緒池中的執行緒的優先順序和所屬的執行緒池相同 set_user_nice(worker->task, pool->attrs->nice); //這個標誌阻礙使用者介面修改當前執行緒優先順序 /* prevent userland from meddling with cpumask of workqueue workers */ worker->task->flags |= PF_NO_SETAFFINITY; //連結串列操作將worker 串入worker_pool /* successful, attach the worker to the pool */ worker_attach_to_pool(worker, pool); return worker; fail: if (id >= 0) ida_simple_remove(&pool->worker_ida, id); kfree(worker); return NULL; }
這裡也驗證我前面的一個猜測,即一個worker_pool中的的worker執行緒的優先順序都是繼承自所屬執行緒池的。啟動執行緒的操作時核心的通用執行緒操作這裡不在看,來重點藍worker執行緒介面函式worker_thread。
static int worker_thread(void *__worker) { struct worker *worker = __worker; struct worker_pool *pool = worker->pool; /* tell the scheduler that this is a workqueue worker */ //告訴排程器這是一個worker 可見工作執行緒的支援是深入核心的 worker->task->flags |= PF_WQ_WORKER; woke_up: spin_lock_irq(&pool->lock); //worker 需要銷燬?此時和他關聯的執行緒已經銷燬了(猜的找機會驗證) /* am I supposed to die? */ if (unlikely(worker->flags & WORKER_DIE)) { spin_unlock_irq(&pool->lock); WARN_ON_ONCE(!list_empty(&worker->entry)); worker->task->flags &= ~PF_WQ_WORKER; set_task_comm(worker->task, "kworker/dying"); ida_simple_remove(&pool->worker_ida, worker->id); worker_detach_from_pool(worker, pool); kfree(worker); return 0; } //清楚空閒標誌,並從空閒執行緒連結串列上移到執行執行緒list上 worker_leave_idle(worker); recheck: /* no more worker necessary? */ //如果當前worker_pool->worklist中沒有待處理的任務,並且當前pool沒有正在執行的worker這個介面返回False,這裡就會休眠 if (!need_more_worker(pool)) goto sleep; //may_start_working()檢查是否還有空閒狀態worker,沒有則通過 manage_workers()建立一個 /* do we need to manage? */ if (unlikely(!may_start_working(pool)) && manage_workers(worker)) //返回再次檢查 goto recheck; /* * ->scheduled list can only be filled while a worker is * preparing to process a work or actually processing it. * Make sure nobody diddled with it while I was sleeping. */ //scheduled 中儲存著正在處理的work或即將處理的work(必然) WARN_ON_ONCE(!list_empty(&worker->scheduled)); /* * Finish PREP stage. We're guaranteed to have at least one idle * worker or that someone else has already assumed the manager * role. This is where @worker starts participating in concurrency * management if applicable and concurrency management is restored * after being rebound. See rebind_workers() for details. */ //維護執行緒池屬性 這裡worker_pool->nr_running 計數維護 worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND); //遍歷執行緒池上的work,並處理 do { struct work_struct *work = list_first_entry(&pool->worklist, struct work_struct, entry); if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) { /* optimization path, not strictly necessary */ //當前worker 處理當前work process_one_work(worker, work); if (unlikely(!list_empty(&worker->scheduled))) //如果當前worker上有待處理的work,先處理它(這是個補丁應該) process_scheduled_works(worker); } else { //如果當前work_struct置位WORK_STRUCT_LINKED表示work後面還串上其它work, //把這些work遷移到woeker_pool->scheduled中,然後一併再用process_one_work()函式處理。 move_linked_works(work, &worker->scheduled, NULL); process_scheduled_works(worker); } } while (keep_working(pool)); //處理完了 worker_set_flags(worker, WORKER_PREP, false); //空閒睡眠處理 sleep: /* * pool->lock is held and there's no work to process and no need to * manage, sleep. Workers are woken up only while holding * pool->lock or from local cpu, so setting the current state * before releasing pool->lock is enough to prevent losing any * event. */ worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up; }
然後就是具體的處理過程其實上面的process_scheduled_works()介面最後實際也是呼叫的process_one_work()介面進行處理的所以這裡來看一下具體的處理過程:
static void process_scheduled_works(struct worker *worker) { while (!list_empty(&worker->scheduled)) { struct work_struct *work = list_first_entry(&worker->scheduled, struct work_struct, entry); process_one_work(worker, work); } } static void process_one_work(struct worker *worker, struct work_struct *work) __releases(&pool->lock) __acquires(&pool->lock) { struct pool_workqueue *pwq = get_work_pwq(work); struct worker_pool *pool = worker->pool; //判斷當前的workqueue是否是CPU_INTENSIVE,會對其所在工作執行緒進行特殊設定。 bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE; int work_color; struct worker *collision; #ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct from * inside the function that is called from it, this we need to * take into account for lockdep too. To avoid bogus "held * lock freed" warnings as well as problems when looking into * work->lockdep_map, make a copy and use that here. */ struct lockdep_map lockdep_map; lockdep_copy_map(&lockdep_map, &work->lockdep_map); #endif /* ensure we're on the correct CPU */ WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) && raw_smp_processor_id() != pool->cpu); /* * A single work shouldn't be executed concurrently by * multiple workers on a single cpu. Check whether anyone is * already processing the work. If so, defer the work to the * currently executing one. */ //查詢當前work是否在worker_pool->busy_hash表中正在執行, //如果在就移到當前work正在執行的worker->scheduled並退出當前處理。 collision = find_worker_executing_work(pool, work); if (unlikely(collision)) { move_linked_works(work, &collision->scheduled, NULL); return; } /* claim and dequeue */ debug_work_deactivate(work); hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work); worker->current_work = work; worker->current_func = work->func; worker->current_pwq = pwq; work_color = get_work_color(work); list_del_init(&work->entry); /* * CPU intensive works don't participate in concurrency management. * They're the scheduler's responsibility. This takes @worker out * of concurrency management and the next code block will chain * execution of the pending work items. */ if (unlikely(cpu_intensive)) //設定當前工作執行緒flags,排程器就知道核心執行緒屬性了, //但實際上排程器暫時並沒有做特殊處理。 worker_set_flags(worker, WORKER_CPU_INTENSIVE); /* * Wake up another worker if necessary. The condition is always * false for normal per-cpu workers since nr_running would always * be >= 1 at this point. This is used to chain execution of the * pending work items for WORKER_NOT_RUNNING workers such as the * UNBOUND and CPU_INTENSIVE ones. */ //判斷是否需要喚醒更多工作執行緒,wake_up_worker()去喚 //醒worker_pool中第一個idle執行緒。對於bound型worker_pool此時一般nr_running>=1,所以條件不成立。 if (need_more_worker(pool)) wake_up_worker(pool); /* * Record the last pool and clear PENDING which should be the last * update to @work. Also, do this inside @pool->lock so that * PENDING and queued state changes happen together while IRQ is * disabled. */ //清除struct worker中data成員pending標誌位, //裡面使用了smp_wmb保證了pending之前的寫操作完成之後才清除pending。 set_work_pool_and_clear_pending(work, pool->id); spin_unlock_irq(&pool->lock); lock_map_acquire_read(&pwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); trace_workqueue_execute_start(work); //真正執行work的回撥函式 worker->current_func(work); /* * While we must be careful to not use "work" after this, the trace * point will only record its address. */ trace_workqueue_execute_end(work); lock_map_release(&lockdep_map); lock_map_release(&pwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n" " last function: %pf\n", current->comm, preempt_count(), task_pid_nr(current), worker->current_func); debug_show_held_locks(current); dump_stack(); } /* * The following prevents a kworker from hogging CPU on !PREEMPT * kernels, where a requeueing work item waiting for something to * happen could deadlock with stop_machine as such work item could * indefinitely requeue itself while all other CPUs are trapped in * stop_machine. At the same time, report a quiescent RCU state so * the same condition doesn't freeze RCU. */ cond_resched_rcu_qs(); spin_lock_irq(&pool->lock); /* clear cpu intensive status */ if (unlikely(cpu_intensive)) worker_clr_flags(worker, WORKER_CPU_INTENSIVE); /* we're done with it, release */ //work回撥函式執行完成後的清理工作 hash_del(&worker->hentry); worker->current_work = NULL; worker->current_func = NULL; worker->current_pwq = NULL; worker->desc_valid = false; pwq_dec_nr_in_flight(pwq, work_color); }
處理過程除了涉及工作佇列的資料維護外其餘的需要注意的就是清除struct worker中data成員pending標誌位的操作了,他可以說明工作佇列排程一次執行一次處理就結束了。到這裡其實關鍵的工作佇列相關的處理的部分都算已經處理完了。其餘剩餘的CPU啟用和睡眠的回撥介面不再去看內容精煉一下,其次是工作佇列的建立相關的內容暫時不用就不去看了看用不上就記不住--徒勞。接下來重點看一下如何使用工作佇列。
工作佇列的使用
工作佇列的使用就三部曲,定義一個工作,初始化工作,排程他(其實是將其加入到核心的資料結構中由核心排程執行)。
定義工作
可以使用動態分配的也可以使用靜態方式定義,最後再呼叫__INIT_WORK()來初始化work,其次是衍生的一些其他介面,大多是標誌位的不同,其次是delay_work這才是中斷低半部常用的介面。
初始化
#define __INIT_WORK(_work, _func, _onstack) \ do { \ __init_work((_work), _onstack); \ (_work)->data = (atomic_long_t) WORK_DATA_INIT(); \ INIT_LIST_HEAD(&(_work)->entry); \ (_work)->func = (_func); \ } while (0)
這裡在將work封裝的相關介面記錄一下:
#define TIMER_DEFERRABLE 0x1LU #define TIMER_IRQSAFE 0x2LU #define __INIT_WORK(_work, _func, _onstack) \ do { \ __init_work((_work), _onstack); \ (_work)->data = (atomic_long_t) WORK_DATA_INIT(); \ INIT_LIST_HEAD(&(_work)->entry); \ (_work)->func = (_func); \ } while (0) #define INIT_WORK(_work, _func) \ __INIT_WORK((_work), (_func), 0) #define INIT_WORK_ONSTACK(_work, _func) \ __INIT_WORK((_work), (_func), 1) #define __INIT_DELAYED_WORK(_work, _func, _tflags) \ do { \ INIT_WORK(&(_work)->work, (_func)); \ __setup_timer(&(_work)->timer, delayed_work_timer_fn, \ (unsigned long)(_work), \ (_tflags) | TIMER_IRQSAFE); \ } while (0) #define __INIT_DELAYED_WORK_ONSTACK(_work, _func, _tflags) \ do { \ INIT_WORK_ONSTACK(&(_work)->work, (_func)); \ __setup_timer_on_stack(&(_work)->timer, \ delayed_work_timer_fn, \ (unsigned long)(_work), \ (_tflags) | TIMER_IRQSAFE); \ } while (0) #define INIT_DELAYED_WORK(_work, _func) \ __INIT_DELAYED_WORK(_work, _func, 0) #define INIT_DELAYED_WORK_ONSTACK(_work, _func) \ __INIT_DELAYED_WORK_ONSTACK(_work, _func, 0) #define INIT_DEFERRABLE_WORK(_work, _func) \ __INIT_DELAYED_WORK(_work, _func, TIMER_DEFERRABLE) #define INIT_DEFERRABLE_WORK_ONSTACK(_work, _func) \ __INIT_DELAYED_WORK_ONSTACK(_work, _func, TIMER_DEFERRABLE)
排程
work初始化完成後就需要將work加入到工作佇列中去了,而預設的工作佇列是system_wq回想初始化過程建立的工作佇列。呼叫的介面是schedule_work()如下的呼叫過程:
/** * schedule_work - put work task in global workqueue * @work: job to be done * * Returns %false if @work was already on the kernel-global workqueue and * %true otherwise. * * This puts a job in the kernel-global workqueue if it was not already * queued and leaves it in the same position on the kernel-global * workqueue otherwise. */ static inline bool schedule_work(struct work_struct *work) { return queue_work(system_wq, work); } /** * queue_work - queue work on a workqueue * @wq: workqueue to use * @work: work to queue * * Returns %false if @work was already on a queue, %true otherwise. * * We queue the work to the CPU on which it was submitted, but if the CPU dies * it can be processed by another CPU. */ static inline bool queue_work(struct workqueue_struct *wq, struct work_struct *work) { return queue_work_on(WORK_CPU_UNBOUND, wq, work); } /** * queue_work_on - queue work on specific cpu * @cpu: CPU number to execute work on * @wq: workqueue to use * @work: work to queue * * We queue the work to a specific CPU, the caller must ensure it * can't go away. * * Return: %false if @work was already on a queue, %true otherwise. */ bool queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) { bool ret = false; unsigned long flags; local_irq_save(flags); if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { __queue_work(cpu, wq, work); ret = true; } local_irq_restore(flags); return ret; }
由這個過程可以知道,這個介面新增的工作佇列是不指定具體的CPU執行的,其中的__queue_work()是重要的新增操作過程:
static void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work) { struct pool_workqueue *pwq; struct worker_pool *last_pool; struct list_head *worklist; unsigned int work_flags; unsigned int req_cpu = cpu; //是否處於關中斷狀態 WARN_ON_ONCE(!irqs_disabled()); debug_work_activate(work); /* __WQ_DRAINING表示要銷燬workqueue,那麼掛入workqueue中所有的work都要處理完 畢才能把這個workqueue銷燬。在銷燬過程中,一般不允許再有新的work加入佇列中。 有一種特殊例外是正在清空work時觸發了一個queue work操作,這種情況被稱為chained work。 */ /* if draining, only works from the same workqueue are allowed */ if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq))) return; retry: if (req_cpu == WORK_CPU_UNBOUND) cpu = raw_smp_processor_id(); /* pwq which will be used unless @work is executing elsewhere */ if (!(wq->flags & WQ_UNBOUND)) //對於bound型的workqueue,直接使用本地CPU對應pool_workqueue。 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); else //對於unbound型,呼叫unbound_pwq_by_node()尋找本地node節點對應的unbound型別的pool_workqueue。 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); /* * If @work was previously on a different pool, it might still be * running there, in which case the work needs to be queued on that * pool to guarantee non-reentrancy. */ //通過work_struct的成員data查詢該work上一次是在哪個worker_pool中執行的。 last_pool = get_work_pool(work); //如果上次執行的worker_pool和本次不一致 if (last_pool && last_pool != pwq->pool) { struct worker *worker; spin_lock(&last_pool->lock); //判斷一個work是否正在last_pool上執行,也即不在當前worker_pool執行,如果是返回這個正在執行的工作執行緒worker worker = find_worker_executing_work(last_pool, work); if (worker && worker->current_pwq->wq == wq) { //利用當前work正在執行的pool_workqueue,利用快取熱度,不進行排程。 pwq = worker->current_pwq; } else { /* meh... not running there, queue here */ spin_unlock(&last_pool->lock); spin_lock(&pwq->pool->lock); } } else { spin_lock(&pwq->pool->lock); } if (unlikely(!pwq->refcnt)) { //對unbound型別pool_workqueue釋放是非同步的,當refcnt減少到0時, //說明該pool_workqueue已經被釋放,那麼需要跳轉到retry出重新選擇pool_workqueue。 if (wq->flags & WQ_UNBOUND) { spin_unlock(&pwq->pool->lock); cpu_relax(); goto retry; } /* oops */ WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt", wq->name, cpu); } /* pwq determined, queue */ trace_workqueue_queue_work(req_cpu, pwq, work); if (WARN_ON(!list_empty(&work->entry))) { spin_unlock(&pwq->pool->lock); return; } pwq->nr_in_flight[pwq->work_color]++; work_flags = work_color_to_flags(pwq->work_color); if (likely(pwq->nr_active < pwq->max_active)) { //判斷當前pool_workqueue的work活躍數量,如果少於最高限值, //就加入pending狀態連結串列worker_pool->worklist,否則加入delayed_works連結串列中。 trace_workqueue_activate_work(work); pwq->nr_active++; worklist = &pwq->pool->worklist; } else { work_flags |= WORK_STRUCT_DELAYED; worklist = &pwq->delayed_works; } //將當前work加入到pool_workqueue->worklist尾部。 insert_work(pwq, work, worklist, work_flags); spin_unlock(&pwq->pool->lock); }
在關中斷的情況下將,work通過insert_work新增到對應的work_queue中。在新增的過程最後會檢查當前的執行緒池中是否有工作的執行緒即執行緒池有活動的執行緒,因為只有執行緒池中有活動的執行緒才能回執行worker執行緒處理函式,進而發現有待處理的work進行執行處理。所以如果沒有任何worker在執行則需要喚醒一個執行緒採用wake_up_worker(pool)介面。這個介面的執行也十分簡單就是進入執行緒池找到第一個空閒的worker然後執行執行緒喚醒介面wake_up_process(task)喚醒這個執行緒即可。static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
struct list_head *head, unsigned int extra_flags) { struct worker_pool *pool = pwq->pool; /* we own @work, set data and link */ //把pool_workqueue指標的值和一些flag設定到data成員中, //方便下次呼叫queue_work()知道本次使用哪個pool_workqueue()。 set_work_pwq(work, pwq, extra_flags); 將work加入到worker_pool->worklist尾部。 list_add_tail(&work->entry, head); //增加pool_workqueue->refcnt成員引用計數。 get_pwq(pwq); /* * Ensure either wq_worker_sleeping() sees the above * list_add_tail() or we see zero nr_running to avoid workers lying * around lazily while there are works to be processed. */ //保證wake_up_worker()喚醒worker時,在__schedule()->wq_worker_sleeping()時 //這裡的list_add_tail()已經完成。同時保證下面__need_more_worker()讀取nr_running時連結串列已經完成。 smp_mb();
//如果當前nr_running為0,表示當前worker可能並沒有處於執行狀態需要喚醒一個工作執行緒。 if (__need_more_worker(pool))那麼需要wake_up_worker() wake_up_worker(pool); }
除了以上的介面還有其他的介面可以控制work的新增
//指定執行處理的CPU int schedule_work_on(int cpu, struct work_struct *work) { return queue_work_on(cpu, system_wq, work); } //延遲執行這個work,一般作為中斷低半部處理機制使用在中斷上下文呼叫而在程序上下文處理 int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay) { return queue_delayed_work(system_wq, dwork, delay); } //指定在那個cpu上延遲處理,上面的結合體 int schedule_delayed_work_on(int cpu, struct delayed_work *dwork, unsigned long delay) { return queue_delayed_work_on(cpu, system_wq, dwork, delay); }
schedule_work()介面最後預設是將工作加了到system_wq,上面的這些介面支援將工作佇列加到別的工作佇列上進行處理,所以這裡在瞭解一下其他的工作佇列如下。
其他工作佇列
- schedule_work(),其預設將work放入system_wq上。系統還有其它很多預設workqueue,這些workqueue也都是通過queue_work()將work放入其上。下面介紹一些其它系統全域性workqueue的使用。
- system_highpri_wq 和system_wq的區別在於WQ_HIGHPRI,這些work對應的工作執行緒位於cpu_worker_pool[1]中。工作執行緒的nice為-20,要比system_wq對應的工作執行緒優先順序要高。
- system_long_wq和system_wq類似,但是一般system_long_wq用於執行時間較長的work,而system_wq放執行較短的work。這兩個workqueue沒有明顯的區別,更多的是靠使用者自覺。
- system_nrt_wq相對於system_wq使用了WQ_NON_REENTRANT。預設情況下工作佇列只是確保在同一CPU不可重入,即工作在同一CPU上不會被多個工作執行緒併發執行,但容許在多個CPU上併發執行。該標誌表明在多個CPU上也是不可重入的,工作將在不可重入workqueue上,並確保至多在一個系統範圍內的工作執行緒上執行。
- system_unbound_wq相對於system_wq的區別是被設定為WQ_UNBOUND,沒有併發管理,且work最大活躍數不超過WQ_UNBOUND_MAX_ACTIVE,一般為WQ_MAX_ACTIVE=512。
- system_unbound_wq對應的工作執行緒不會被繫結到特定CPU,所有排隊的work會被立即執行,只要資源足夠並且不超過最大活躍數。
- system_freezable_wq 相對於system_wq多了WQ_FREEZABLE標誌,表示可以凍結workqueue參與系統的暫停操作,該workqueue的工作將被暫停,除非被喚醒,否者沒有新的work被執行。
- system_power_efficient_wq相對於system_wq多了WQ_POWER_EFFICIENT標誌,將工作隊列表示為unbound已達到節省功耗的目的,並且還需要wq_power_efficient開啟。否則和system_wq沒啥區別。
- system_freezable_power_efficient_wq兼具system_freezable_wq的freezable和system_power_efficient_wq的power efficient兩個特性。
在有些時候還可能需要自己建立工作佇列來完成自己的work的排程使用,此時會使用建立工作佇列的介面相關的內容,這裡暫時記錄不去深究後續用到了再來完善。
取消一個work
work新增到工作佇列後會被執行,但有時也需要取消比如驅動程式在關閉裝置節點,出現錯誤,或者要掛起時,需要取消一個已經被排程的work。cancel_work_sync()函式取消一個已經排程的work,該函式的工作流程圖如下。
以上就是Linux核心工作佇列的相關流程和處理過車梳理,不是全部但是可以對工作佇列的工作特性骨架有一定的瞭解。
參考: