1. 程式人生 > 實用技巧 >Linux核心實現透視---軟中斷&Tasklet

Linux核心實現透視---軟中斷&Tasklet

軟中斷

首先明確一個概念軟中斷(不是軟體中斷intn)。總來來說軟中斷就是核心在啟動時為每一個核心建立了一個特殊的程序,這個程序會不停的poll檢查是否有軟中斷需要執行,如果需要執行則呼叫註冊的介面函式。所以軟中斷是執行在程序上下文的,而且可能併發執行在不同CPU上。所謂的軟中斷就是核心利用核心執行緒配合抽象的資料結構進行管理執行緒合適時間呼叫註冊的介面的一套軟體管理機制。

先看管理軟中斷的資料結構因為資料結構最能說明邏輯核心對軟體中斷抽象的資料結構主要有如下幾個部分。

中斷服務介面管理

在核心中宣告在\kernel\softirq.c中如下

#ifndef __ARCH_IRQ_STAT
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
EXPORT_SYMBOL(irq_stat);
#endif static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp; struct softirq_action { void (*action)(struct softirq_action *); };

其中的NR_SOFTIRQS由軟中斷型別的列舉物件提供如下定義:

enum
{
    HI_SOFTIRQ=0,
    TIMER_SOFTIRQ,
    NET_TX_SOFTIRQ,
    NET_RX_SOFTIRQ,
    BLOCK_SOFTIRQ,
    BLOCK_IOPOLL_SOFTIRQ,
    TASKLET_SOFTIRQ,
    SCHED_SOFTIRQ,
    HRTIMER_SOFTIRQ,
    RCU_SOFTIRQ,    
/* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };

之所以綜上可以知道核心維護了一個struct softirq_action型別的軟中斷介面陣列,而軟中斷的狀態則是由前面的irq_cpustat_t型別的陣列管理,由定義可以知道狀態是和CPU關聯的,表示某一個CPU上的軟中斷狀態。下面看看irq_cpustat_t的定義,也非常的的簡單主要就是其中的__softirq_pending成員,這個成員的每一個bit表示一種型別的中斷型別的狀態資訊,並且低bit的中斷型別的中斷優先順序高。

typedef struct
{ unsigned int __softirq_pending;//標記是否有軟中斷懸起 long idle_timestamp; /* 統計資訊 */ /* Hard interrupt statistics. */ unsigned int irq_timer_count; unsigned int irq_syscall_count; unsigned int irq_resched_count; unsigned int irq_hv_flush_count; unsigned int irq_call_count; unsigned int irq_hv_msg_count; unsigned int irq_dev_intr_count; } ____cacheline_aligned irq_cpustat_t;

在通過Tasklet接介面中斷的建立就可以知道軟體中斷的註冊(open_softirq)過程就是修改前面定義的softirq_vec陣列,就可以完成軟體中斷的註冊,而驅動開發人員也很少直接使用軟體中斷。

//介面中的nr就是上面列舉值,action就是軟中斷服務函式
open_softirq(int nr,void(*action)(struct softirq_action *)); 

再看核心在啟動時為每個CPU建立的執行緒操作:

static struct notifier_block cpu_nfb = {
    .notifier_call = cpu_callback
};

static struct smp_hotplug_thread softirq_threads = {
    .store            = &ksoftirqd,
    .thread_should_run    = ksoftirqd_should_run,
    .thread_fn        = run_ksoftirqd,
    .thread_comm        = "ksoftirqd/%u",
};

static __init int spawn_ksoftirqd(void)
{
    register_cpu_notifier(&cpu_nfb);

    BUG_ON(smpboot_register_percpu_thread(&softirq_threads));

    return 0;
}
early_initcall(spawn_ksoftirqd);

重點是這個介面函式smpboot_register_percpu_thread如下:

/**
 * smpboot_register_percpu_thread - Register a per_cpu thread related to hotplug
 * @plug_thread:    Hotplug thread descriptor
 *
 * Creates and starts the threads on all online cpus.
 */
int smpboot_register_percpu_thread(struct smp_hotplug_thread *plug_thread)
{
    unsigned int cpu;
    int ret = 0;

    get_online_cpus();
    mutex_lock(&smpboot_threads_lock);
    for_each_online_cpu(cpu) {
        ret = __smpboot_create_thread(plug_thread, cpu);
        if (ret) {
            smpboot_destroy_threads(plug_thread);
            goto out;
        }
        smpboot_unpark_thread(plug_thread, cpu);
    }
    list_add(&plug_thread->list, &hotplug_threads);
out:
    mutex_unlock(&smpboot_threads_lock);
    put_online_cpus();
    return ret;
}

傳進來的引數是softirq_threads,先獲取線上即啟用的CPU然後遍歷呼叫__smpboot_create_thread引數同樣是前面定義的softirq_threads繼續向下看:

tatic int
__smpboot_create_thread(struct smp_hotplug_thread *ht, unsigned int cpu)
{
    struct task_struct *tsk = *per_cpu_ptr(ht->store, cpu);
    struct smpboot_thread_data *td;

    if (tsk)
        return 0;

    td = kzalloc_node(sizeof(*td), GFP_KERNEL, cpu_to_node(cpu));
    if (!td)
        return -ENOMEM;
    td->cpu = cpu;
    td->ht = ht;

    tsk = kthread_create_on_cpu(smpboot_thread_fn, td, cpu,
                    ht->thread_comm);
    if (IS_ERR(tsk)) {
        kfree(td);
        return PTR_ERR(tsk);
    }
    get_task_struct(tsk);
    *per_cpu_ptr(ht->store, cpu) = tsk;
    if (ht->create) {
        /*
         * Make sure that the task has actually scheduled out
         * into park position, before calling the create
         * callback. At least the migration thread callback
         * requires that the task is off the runqueue.
         */
        if (!wait_task_inactive(tsk, TASK_PARKED))
            WARN_ON(1);
        else
            ht->create(cpu);
    }
    return 0;
}

看建立了一個核心執行緒在特定CPU上通過kthread_create_on_cpu(smpboot_thread_fn,td,cpu,ht->thread_comm)介面,不在往深入繼續看,這裡只需要建立了一個繫結CPU的執行緒,執行緒函式是smpboot_thread_fn這個比較重要需要詳細看一下。傳入的data就是一個struct smpboot_thread_data型別的資料這個資料中儲存了softirq_threads在ht中如下,程序開始執行時先關閉搶佔,檢查是否需要停止當前執行緒如果需要則立馬停止當前執行緒,這裡肯定不需要停止除非是關機(我的理解)。然就是檢查是否要暫停,因為使用者的軟中斷介面可能呼叫阻塞介面會阻塞當前內爾後程序所以需要暫停當前執行緒最後的恢復也是有使用者軟體中斷服務函式完成(我的理解)最後部分原始碼註釋如下:

static int smpboot_thread_fn(void *data)
{
    struct smpboot_thread_data *td = data;
    struct smp_hotplug_thread *ht = td->ht;

    while (1) {
        set_current_state(TASK_INTERRUPTIBLE);
//關閉核心搶佔機制 preempt_disable();
     //是否需要停止當前執行緒關機時才執行??
if (kthread_should_stop()) { __set_current_state(TASK_RUNNING); preempt_enable(); if (ht->cleanup) ht->cleanup(td->cpu, cpu_online(td->cpu)); kfree(td); return 0; } if (kthread_should_park()) { __set_current_state(TASK_RUNNING); preempt_enable(); if (ht->park && td->status == HP_THREAD_ACTIVE) { BUG_ON(td->cpu != smp_processor_id()); ht->park(td->cpu); td->status = HP_THREAD_PARKED; } kthread_parkme(); /* We might have been woken for stop */ continue; } BUG_ON(td->cpu != smp_processor_id()); /* Check for state change setup */ switch (td->status) { case HP_THREAD_NONE: __set_current_state(TASK_RUNNING); preempt_enable(); if (ht->setup) ht->setup(td->cpu); td->status = HP_THREAD_ACTIVE; continue; case HP_THREAD_PARKED: __set_current_state(TASK_RUNNING); preempt_enable(); if (ht->unpark) ht->unpark(td->cpu); td->status = HP_THREAD_ACTIVE; continue; } /* * 就是通過呼叫ksoftirqd_should_run 這是在一開始定義的softirq_threads中指定的,檢查當前CPU上維護的軟體中斷陣列中是否有中斷 * 的置起了從而決定當前的軟體中斷執行緒是否需要執行,不需要執行則放棄時間片 */ if (!ht->thread_should_run(td->cpu)) {
       /*
       *沒有需要的軟體中斷需要執行,則放棄時間片
       */ preempt_enable_no_resched(); schedule(); }
else { /* * 有中斷需要執行則直接呼叫 run_ksoftirqd 執行軟體中斷註冊的介面的呼叫 */ __set_current_state(TASK_RUNNING); preempt_enable(); //這個介面在上面初始化時繫結為run_ksoftirqd ht->thread_fn(td->cpu); } } }
可以看到run_ksoftirqd如下:
static void run_ksoftirqd(unsigned int cpu)
{
    local_irq_disable();
    if (local_softirq_pending()) {
        /*
         * We can safely run softirq on inline stack, as we are not deep
         * in the task stack here.
         */
        __do_softirq();
        local_irq_enable();
        cond_resched_rcu_qs();
        return;
    }
    local_irq_enable();
}

關閉本CPU上的硬中斷然後執行__do_softirq();這個是軟體中斷的重點介面如下,註釋了一部分:

asmlinkage __visible void __do_softirq(void)
{
    unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
    unsigned long old_flags = current->flags;
    int max_restart = MAX_SOFTIRQ_RESTART;
    struct softirq_action *h;
    bool in_hardirq;
    __u32 pending;
    int softirq_bit;

    /*
     * Mask out PF_MEMALLOC s current task context is borrowed for the
     * softirq. A softirq handled such as network RX might set PF_MEMALLOC
     * again if the socket is related to swap
     */
    current->flags &= ~PF_MEMALLOC;
    //儲存懸起的軟體中斷的點陣圖
    pending = local_softirq_pending();
    account_irq_enter_time(current);
    //標記進入軟體中斷上下文
    __local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);
    in_hardirq = lockdep_softirq_start();

restart:
    /* Reset the pending bitmask before enabling irqs */
    //清除懸起的軟體中斷的點陣圖
    set_softirq_pending(0);
    //開啟硬體中斷
    local_irq_enable();
    //取軟體中斷的全域性中斷介面連結串列
    h = softirq_vec;
    //判斷是否有懸起的軟體中斷bit,返回地最低置起的bit位置 1開始而不是0,軟中斷也是由優先順序的低bit優先
    while ((softirq_bit = ffs(pending))) {
        unsigned int vec_nr;
        int prev_count;
        //取出對應的中斷物件
        h += softirq_bit - 1;
        //取出對應的中斷index
        vec_nr = h - softirq_vec;
        prev_count = preempt_count();

        kstat_incr_softirqs_this_cpu(vec_nr);

        trace_softirq_entry(vec_nr);
        //執行軟體中斷註冊的介面函式
        h->action(h);
        trace_softirq_exit(vec_nr);
        if (unlikely(prev_count != preempt_count())) {
            pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
                   vec_nr, softirq_to_name[vec_nr], h->action,
                   prev_count, preempt_count());
            preempt_count_set(prev_count);
        }
        //清除剛才處理過的中斷bit並右移動整個點陣圖,然後移動軟體中斷控制代碼
        h++;
        pending >>= softirq_bit;
        //移動後繼續回去處理剩下置起的bit
    }
    //到這裡說明本次進來時置起的bit全部處理完了
    rcu_bh_qs();
    local_irq_disable();
    //再檢查在處理期間有無新置起的軟體中斷,如果有則需要繼續處理軟體中斷
    pending = local_softirq_pending();
    if (pending) {
        /*
        *又有新的軟體標誌置起需要處理,則開始處理,這裡有一個保護機制,因為軟體中斷的優先順序是很高的相對於使用者程序如果軟體中斷
        *源源不斷則需要進行保護避免其他程序無法執行而導致系統實時性差,這裡有三個條件一個步滿足就會會停止本次的軟體中斷的執行
        *而先去執行其他程序排程
        *1、軟中斷處理時間不超過2jiffies,200Hz的系統對應10ms;
        *2、當前沒有有程序需要排程,即!need_resched();
        *3、這種迴圈不超過MAX_SOFTIRQ_RESTART次 一般是10
        */
        if (time_before(jiffies, end) && !need_resched() &&
            --max_restart)
            goto restart;
        //不滿足其中一個條件則重新喚醒ksoftirq核心執行緒來處理軟中斷,因為這個函式可能在中斷上下文執行所以需要進行限制
        wakeup_softirqd();
    }

    lockdep_softirq_end(in_hardirq);
    account_irq_exit_time(current);
    //使能中斷底半部
    __local_bh_enable(SOFTIRQ_OFFSET);
    WARN_ON_ONCE(in_interrupt());
    tsk_restore_flags(current, old_flags, PF_MEMALLOC);
}

注意軟體中斷的處理過程對軟中斷連續執行的時間進行了限制其實是有原因的,因為上述軟中斷處理部分的程式碼執行機會有可能在中斷上下文irq_exit()具體的呼叫鏈就是irq_exit()->invoke_softirq()->wakeup_softirq()如下(可參考硬中斷的分析過程):

void irq_exit(void)
{
#ifndef __ARCH_IRQ_EXIT_IRQS_DISABLED
    local_irq_disable();
#else
    WARN_ON_ONCE(!irqs_disabled());
#endif

    account_irq_exit_time(current);
    preempt_count_sub(HARDIRQ_OFFSET);
    if (!in_interrupt() && local_softirq_pending())
        invoke_softirq();

    tick_irq_exit();
    rcu_irq_exit();
    trace_hardirq_exit(); /* must be last! */
}

static inline void invoke_softirq(void)
{
    if (!force_irqthreads) {
#ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
        /*
         * We can safely execute softirq on the current stack if
         * it is the irq stack, because it should be near empty
         * at this stage.
         */
        __do_softirq();
#else
        /*
         * Otherwise, irq_exit() is called on the task stack that can
         * be potentially deep already. So call softirq in its own stack
         * to prevent from any overrun.
         */
        do_softirq_own_stack();
#endif
    } else {
        wakeup_softirqd();
    }
}

invoke_softirq的執行過程就是判斷一下是否強制中斷執行緒化了這個是由CONFIG_IRQ_FORCED_THREADING巨集進行配置,如果是執行緒化了則直接喚醒ksoftirq執行緒,可以結合前面分析硬中斷的響應過程就可以明白,因為此時還是在中斷上下文的所以才有上面分析的__do_softirq的三個條件得處理機制就是不希望軟中斷過分長時間的在中斷上下文執行。因為現在的Linux核心已經把中斷服務函式強制執行緒化了所以如果中斷本身執行會先於軟體中斷執行,而軟體中斷的執行時在軟體中斷執行緒poll時得到執行的。到此軟體中斷的簡單執行過程分析就算完了,至於軟體中斷的管理介面另一篇部落格會來學習。

tasklet

tasklet是基於軟中斷實現的,因為軟中斷就是維護了一個軟中斷型別表而其中有兩個型別就是專門留給tasklet的高優先順序和普通優先順序任務的如下:

enum
{
    HI_SOFTIRQ=0,//最高優先順序的軟中斷型別
    TIMER_SOFTIRQ,//Timer定時器軟中斷
    NET_TX_SOFTIRQ,//傳送網路資料包軟中斷
    NET_RX_SOFTIRQ,//接收網路資料包軟中斷
    BLOCK_SOFTIRQ,
    BLOCK_IOPOLL_SOFTIRQ,//塊裝置軟中斷
    TASKLET_SOFTIRQ,//專門為tasklet機制準備的軟中斷
    SCHED_SOFTIRQ,//程序排程以及負載均衡軟中斷
    HRTIMER_SOFTIRQ,//高精度定時器軟中斷
    RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */----RCU服務軟中斷

    NR_SOFTIRQS
};

tasklet也是核心實現了一種軟體管理機制,所以來看其資料結構。

tasklet描述

struct tasklet_struct
{
    //多個tasklet串成一個連結串列。
    struct tasklet_struct *next;
     /*
     TASKLET_STATE_SCHED表示tasklet已經被排程,正準備執行; 
     TASKLET_STATE_RUN表示tasklet正在執行中。
    */
    unsigned long state;
    //0表示tasklet處於啟用狀態;非0表示該tasklet被禁止,不允許執行。
    atomic_t count;
   //該tasklet處理介面
    void (*func)(unsigned long);
   //傳遞給tasklet處理函式的引數
    unsigned long data;
};

除此之外核心還為每個CPU維護了兩個tasklet 連結串列如下,一個是高優先順序的tasklet另一個是低優先順序的。資料結構和初始化過程如下:

struct tasklet_head {
    struct tasklet_struct *head;
    struct tasklet_struct **tail;
};

static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

並初始化各自的軟中斷服務函式並和CPU進行繫結。

void __init softirq_init(void)
{
    int cpu;

    for_each_possible_cpu(cpu) {
        per_cpu(tasklet_vec, cpu).tail =
            &per_cpu(tasklet_vec, cpu).head;
        per_cpu(tasklet_hi_vec, cpu).tail =
            &per_cpu(tasklet_hi_vec, cpu).head;
    }

    open_softirq(TASKLET_SOFTIRQ, tasklet_action);
    open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

注意其中的open_softirq註冊了兩個軟中斷這就是tasklet的主要處理介面高優先順序和和普通優先順序的操作很相似所以這裡就只分析普通優先順序的tasklet。

static void tasklet_action(struct softirq_action *a)
{
    struct tasklet_struct *list;
    //注意這裡關閉了本CPU的中斷
    local_irq_disable();
    //讀取為這個CPU維護的tasklet連結串列
    list = __this_cpu_read(tasklet_vec.head);
    //重新初始化tasklet_vec,tasklet 是註冊一次執行一次???????
    __this_cpu_write(tasklet_vec.head, NULL);
    __this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
    local_irq_enable();
    //開中斷後開始處理
    while (list) {
        struct tasklet_struct *t = list;

        list = list->next;
        /*
            如果返回false,表示當前tasklet已經在其他CPU上執行,這一輪將會跳過此tasklet。確保同一個tasklet只能在一個CPU上執行
        */
        if (tasklet_trylock(t)) {
            //表示當前tasklet處於啟用狀態
            if (!atomic_read(&t->count)) {
                //清TASKLET_STATE_SCHED位;如果原來沒有被置位,則返回0,觸發BUG()。
                if (!test_and_clear_bit(TASKLET_STATE_SCHED,
                            &t->state))
                    BUG();
                //執行tasklet_struct 中的func成員即tasklet 介面
                t->func(t->data);
                tasklet_unlock(t);
                continue;
            }
            //執行完了解鎖
            tasklet_unlock(t);
        }

        local_irq_disable();
        t->next = NULL;
        /* 
        *   暫時不懂這裡是什麼操作,????????
        */
        *__this_cpu_read(tasklet_vec.tail) = t;
        __this_cpu_write(tasklet_vec.tail, &(t->next));
        __raise_softirq_irqoff(TASKLET_SOFTIRQ);
        local_irq_enable();
    }
}

通過程式碼可以清晰的明白Tasklet的執行過程就是從維護的tasklet但連結串列中的action介面依次呼叫執行。怎麼把tasklet的介面函式加入到,tasklet中實際上是通過介面tasklet_schedule()完成:

static inline void tasklet_schedule(struct tasklet_struct *t)
{
    //置TASKLET_STATE_SCHED位,如果原來未被置位,則呼叫__tasklet_schedule()。
    //注意這裡和上面的測試和設定和處理過程的TASKLET_STATE_SCHED處理形成呼應
    if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
        __tasklet_schedule(t);
}
//其中的__tasklet_schedule 實際上是一個連結串列操作的函式介面。
void __tasklet_schedule(struct tasklet_struct *t)
{
    unsigned long flags;

    local_irq_save(flags);
    t->next = NULL;
    //將t掛入到tasklet_vec連結串列中
    *__this_cpu_read(tasklet_vec.tail) = t;
    __this_cpu_write(tasklet_vec.tail, &(t->next));
    raise_softirq_irqoff(TASKLET_SOFTIRQ);
    local_irq_restore(flags);
}

到這裡軟中斷和基於軟中斷的tasklet的工作方式都基本大體上清楚了,最後來看幾個介面函式local_bh_disabled()/local_bh_enable()。

local_bh_disabled()

通過其呼叫過程就知道它實際上就是操作了核心的資料標記。

static inline void local_bh_disable(void)
{
    //增加softirq域計數,表示核心狀態進入了軟中斷上下文
    __local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}
// call
 void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
{
    preempt_count_add(cnt);
    barrier();
}
//call
void __preempt_count_add(int val)
{
    *preempt_count_ptr() += val;
}
//call
volatile int *preempt_count_ptr(void)
{
    return &current_thread_info()->preempt_count;
}

local_bh_enable()

同理

static inline void local_bh_disable(void)
{
    __local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}

static __always_inline void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
{
    //增加softirq域計數
    preempt_count_add(cnt);
    //防止編譯器做優化
    barrier();
}

static __always_inline void __preempt_count_add(int val)
{
    *preempt_count_ptr() += val;
}

總得來說軟中斷的設計是為了解決硬中斷在執行期間會關閉本CPU上的其他中斷的相應從而降低了系統的實時性的問題,除此之外結合軟中斷的實現細節可以明確同一個軟中斷程式可以在不同的CPU上併發執行,而同一個CPU上不會發生軟中斷之間相互搶佔。其次是軟中斷可能執行在中斷上下文所以軟中斷中是不能執行阻塞操作的。基於軟中斷的Tasklet也運行於軟體中斷上下文的,除此之外tasklet是不可重入的,這是由tasklet本身的實現決定的。

參考部落格:

https://www.cnblogs.com/arnoldlu/p/8659986.html

https://blog.csdn.net/zhangskd/article/details/21992933