SKB包的接收,從網絡卡驅動到TCP層的處理流程
在開發模組過程中,遇到一個問題:在NF_INET_LOCAL_IN鉤子處截獲資料包後,如果操作失敗,還要把這些截獲的資料包重新傳遞到TCP層處理。但是這個操作是在核心執行緒中完成,不知道會不會對正常的資料包接收過程產生影響?因此,需要知道資料包在從網路層傳遞到傳輸層時的上下文環境(指的是是否禁止核心搶佔、是否需要獲取鎖等)。為了解決這個問題,決定將資料包的接收過程從驅動程式到TCP層的處理流程梳理了一遍。
在文中的敘述過程中,將網絡卡驅動和網路層之間的部分,稱之為網路核心層,如下圖所示:
一、驅動程式
為了找到skb包傳遞到傳輸層的上下文,肯定要從資料包接收的下半部,也就是資料接收的軟中斷中去找,但是既然要梳理,就要梳理的徹底一點,確保沒有遺漏,因此從網絡卡的驅動程式開始。每個網絡卡都會有一箇中斷號,驅動程式中會有一個對應的中斷處理函式。當資料包到達時,網絡卡會向CPU傳送一箇中斷,然後會呼叫特定於網路的驅動程式,來接收資料包。選擇的驅動程式是3c501網絡卡的驅動,該驅動比較簡單,便於看出從驅動程式傳遞到網路核心層傳輸的過程。3c501網絡卡對應的中斷處理函式el_interrupt(),原始碼如下(只列出關鍵的部分):
static irqreturn_t el_interrupt(int irq, void *dev_id) { struct net_device *dev = dev_id; struct net_local *lp; int ioaddr; int axsr; /* Aux. status reg. */ ioaddr = dev->base_addr; lp = netdev_priv(dev); spin_lock(&lp->lock); ...... if (lp->txing) { ...... } else { /* * In receive mode. */ int rxsr = inb(RX_STATUS); ....... if (rxsr & RX_MISSED) dev->stats.rx_missed_errors++; else if (rxsr & RX_RUNT) { /* Handled to avoid board lock-up. */ dev->stats.rx_length_errors++; if (el_debug > 5) pr_debug("%s: runt.\n", dev->name); } else if (rxsr & RX_GOOD) { /* * Receive worked. */ el_receive(dev); } else { /* * Nothing? Something is broken! */ if (el_debug > 2) pr_debug("%s: No packet seen, rxsr=%02x **resetting 3c501***\n", dev->name, rxsr); el_reset(dev); } } /* * Move into receive mode */ outb(AX_RX, AX_CMD); outw(0x00, RX_BUF_CLR); inb(RX_STATUS); /* Be certain that interrupts are cleared. */ inb(TX_STATUS); spin_unlock(&lp->lock); out: return IRQ_HANDLED; }
中斷處理中呼叫inb()來獲取當前中斷的結果,如果是RX_GOOD,則呼叫el_receive()(3c501的接收函式)來處理接收資料的工作。從el_interrupt()中可以看出el_receive()返回後,驅動程式中對中斷的處理已經基本完成。因此,要繼續從el_receive()函式中去找前面提出的問題的答案。
el_receive()中的關鍵程式碼及分析如下:
static void el_receive(struct net_device *dev) { ...... outb(AX_SYS, AX_CMD); skb = dev_alloc_skb(pkt_len+2); /* * Start of frame */ outw(0x00, GP_LOW); if (skb == NULL) { pr_info("%s: Memory squeeze, dropping packet.\n", dev->name); dev->stats.rx_dropped++; return; } else { skb_reserve(skb, 2); /* Force 16 byte alignment */ /* * The read increments through the bytes. The interrupt * handler will fix the pointer when it returns to * receive mode. */ insb(DATAPORT, skb_put(skb, pkt_len), pkt_len); /* * 呼叫eth_type_trans()函式來獲取資料幀承載的 * 報文型別,並且將skb包中的資料起始位置 * 移到資料幀中報文的起始位置。如果 * 承載的是IP報文,則此時data指向的是IP首部的 * 地址。 */ skb->protocol = eth_type_trans(skb, dev); /* * 呼叫netif_rx()將接收的資料包傳遞到 * 網路核心層。 */ netif_rx(skb); dev->stats.rx_packets++; dev->stats.rx_bytes += pkt_len; } return; }
el_receive()首先分配一個sk_buff緩衝區,然後從網絡卡中拷貝資料,之後呼叫netif_rx()將skb包傳遞到網路核心層,至此網絡卡驅動中所做的工作已經完成了。也就是說,當netif_rx()返回後,資料包接收的上半部,也就完成了。從這裡開始我們就要開始進入網路核心層中的處理了。
小結:在網絡卡驅動的中斷處理函式中,也就是資料接收的上半部中,不可能存在和向傳輸層傳遞資料包相關的上下文。但是既然要梳理整個流程,就要徹底一些,以免漏掉什麼東西。通過對驅動程式的研究,可以知道三層、四層中的skb是怎麼來的,資料包是怎麼從驅動程式傳遞到核心的協議棧中。當然還有skb中一些成員是如何設定的。
二、網路核心層
從這裡開始,將更多的注意力放在處理過程中鎖的獲取、中斷的處理、以及核心搶佔等同步手段的處理上,找出向傳輸層傳遞資料包時的上下文環境,也就是呼叫tcp_v4_rcv()開始傳輸層處理時的上下文環境。
在3c501的網絡卡驅動程式中,看到將skb包傳遞到上層是通過netif_rx()函式來完成,每個網絡卡驅動程式在接收到一個包後,都會呼叫該介面來傳遞到上層。接下來看看這個介面的實現,原始碼及分析如下:
/**
* netif_rx - post buffer to the network code
* @skb: buffer to post
*
* This function receives a packet from a device driver and queues it for
* the upper (protocol) levels to process. It always succeeds. The buffer
* may be dropped during processing for congestion control or by the
* protocol layers.
*
* return values:
* NET_RX_SUCCESS (no congestion)
* NET_RX_DROP (packet was dropped)
*
*/
/*
* 資料到來時,會產生中斷,首先執行特定網絡卡的中斷
* 處理程式,然後再執行接收函式分配新的套接字緩衝
* 區,然後通過呼叫netif_rx來講資料傳到上層
* 呼叫該函式標誌著控制由特定於網絡卡的程式碼轉移到了
* 網路層的通用介面部分。該函式的作用是,將接收到
* 的分組放置到一個特定於CPU的等待佇列上,並退出中
* 斷上下文,使得CPU可以執行其他任務
*/
int netif_rx(struct sk_buff *skb)
{
struct softnet_data *queue;
unsigned long flags;
/* if netpoll wants it, pretend we never saw it */
if (netpoll_rx(skb))
return NET_RX_DROP;
/*
* 如果沒有設定資料包到達的時間,
* 則獲取當前的時鐘時間設定到tstamp上
*/
if (!skb->tstamp.tv64)
net_timestamp(skb);
/*
* The code is rearranged so that the path is the most
* short when CPU is congested, but is still operating.
*/
local_irq_save(flags);
/*
* 每個CPU都有一個softnet_data型別變數,
* 用來管理進出分組的等待佇列
*/
queue = &__get_cpu_var(softnet_data);
/*
* 記錄當前CPU上接收的資料包的個數
*/
__get_cpu_var(netdev_rx_stat).total++;
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
/*
* 如果接受佇列不為空,說明當前正在處理資料包,
* 則不需要觸發
* 軟中斷操作,直接將資料包放到接收
* 佇列中,待前面的資料包處理之後,會立即處理
* 當前的資料包。
*/
if (queue->input_pkt_queue.qlen) {
enqueue:
__skb_queue_tail(&queue->input_pkt_queue, skb);
local_irq_restore(flags);
/*
* 至此,中斷處理,也就是資料包接收的上半部已經
* 已經基本處理完成,剩下的工作交給軟中斷來處理。
*/
return NET_RX_SUCCESS;
}
/*
* 如果NAPI程式尚未執行,則重新排程使其開始
* 輪詢,並觸發NET_RX_SOFTIRQ軟中斷。
*/
napi_schedule(&queue->backlog);
goto enqueue;
}
/*
* 如果當前CPU的接受佇列已滿,則丟棄資料包。
* 並記錄當前CPU上丟棄的資料包個數
*/
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);
kfree_skb(skb);
return NET_RX_DROP;
}
每個CPU都有一個管理進出分組的softnet_data結構的例項,netif_rx()將skb包放在當前CPU的接收佇列中,然後呼叫napi_schedule()來將裝置放置在NAPI的輪詢佇列中,並觸發NET_RX_SOFTIRQ軟中斷來進行資料包接收的下半部的處理,也就是在這個過程中找到在向傳輸層傳遞資料包時的上下文。
NET_RX_SOFTIRQ軟中斷對應的處理函式是net_rx_action(),參見net_dev_init()。在解決開始時提到的問題之前,需要先找到軟中斷的處理函式被呼叫時的上下文。
有幾種方法可開啟軟中斷處理,但這些都歸結為呼叫do_softirq()函式。其中一種方式就是在軟中斷守護程序(每個CPU都會有一個守護程序)中呼叫,就以此種方式為切入點來探究。軟中斷守護程序的處理函式時ksoftirqd(),其關鍵的程式碼如下:
static int ksoftirqd(void * __bind_cpu)
{
......
while (!kthread_should_stop()) {
preempt_disable();
......
while (local_softirq_pending()) {
......
do_softirq();
preempt_enable_no_resched();
cond_resched();
preempt_disable();
......
}
preempt_enable();
......
}
......
}
static int ksoftirqd(void * __bind_cpu)
{
......
while (!kthread_should_stop()) {
preempt_disable();
......
while (local_softirq_pending()) {
......
do_softirq();
preempt_enable_no_resched();
cond_resched();
preempt_disable();
......
}
preempt_enable();
......
}
......
}
從這裡可以看出,在開始處理軟中斷之前,要先呼叫preempt_disable()來禁止核心搶佔,這是我們找到的一個需要關注的上下文環境,也就是在協議層中接收資料時,首先要作的就是禁止核心搶佔(當然是不是這樣,還要看後面的處理,這裡姑且這麼認為吧)。
接下來看do_softirq()中的處理,原始碼如下:
asmlinkage void do_softirq(void)
{
__u32 pending;
unsigned long flags;
/*
* 確認當前不處於中斷上下文中(當然,即不涉及
* 硬體中斷)。如果處於中斷上下文,則立即結束。
* 因為軟中斷用於執行ISR中非時間關鍵部分,所以
* 其程式碼本身一定不能在中斷處理程式內呼叫。
*/
if (in_interrupt())
return;
local_irq_save(flags);
/*
* 確定當前CPU軟中斷點陣圖中所有置位的位元位。
*/
pending = local_softirq_pending();
/*
* 如果有軟中斷等待處理,則呼叫__do_softirq()。
*/
if (pending)
__do_softirq();
local_irq_restore(flags);
}
在開始呼叫__do_softirq()作進一步的處理之前,要先呼叫local_irq_save()遮蔽所有中斷,並且儲存當前的中斷狀態,這是第二個我們需要關注的上下文環境。接下來看__do_softirq(),關鍵程式碼如下:
asmlinkage void __do_softirq(void)
{
......
__local_bh_disable((unsigned long)__builtin_return_address(0));
......
local_irq_enable();
h = softirq_vec;
do {
if (pending & 1) {
......
h->action(h);
......
}
h++;
pending >>= 1;
} while (pending);
local_irq_disable();
......
_local_bh_enable();
}
__do_softirq()在呼叫軟中斷對應的action之前,會先呼叫__local_bh_disable()來和其他下半部操作互斥,然後呼叫local_irq_enable()來啟用中斷(注意這個操作和local_irq_restore()不一樣),這是第三個我們需要關注的上文環境。在__do_softirq()中會呼叫到NET_RX_SOFTIRQ軟中斷對應的
處理函式net_rx_action()。接下來看net_rx_action()函式的處理
static void net_rx_action(struct softirq_action *h)
{
......
local_irq_disable();
while (!list_empty(list)) {
......
local_irq_enable();
......
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);
trace_napi_poll(n);
}
......
local_irq_disable();
......
}
out:
local_irq_enable();
......
}
net_rx_action()中進入迴圈前呼叫local_irq_disable()來關閉軟中斷,但是在呼叫NAPI輪詢佇列上裝置的poll函式前又呼叫local_irq_enable()來開啟中斷,因此在net_rx_action()中poll函式的執行上下文(指鎖、中斷等同步手段的環境)中和__do_softirq()中保持一致,沒有發生變化。接下來要關注的是net_rx_action()中呼叫poll介面,預設情況下呼叫的函式是process_backlog(),原始碼如下:
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
napi->weight = weight_p;
do {
struct sk_buff *skb;
local_irq_disable();
/*
* 從當前CPU的接收佇列中取出一個SKB包。
*/
skb = __skb_dequeue(&queue->input_pkt_queue);
/*
* 如果所有的資料已處理完成,則呼叫
* __napi_complete()來將當前裝置移除輪詢佇列。
*/
if (!skb) {
__napi_complete(napi);
local_irq_enable();
break;
}
local_irq_enable();
netif_receive_skb(skb);
/*
* 如果當前的處理次數小於裝置的權重,並且
* 處理時間不超過1個jiffies時間(如果HZ為1000,則
* 相當於是1毫秒),則處理下一個SKB包。
*/
} while (++work < quota && jiffies == start_time);
return work;
}
process_backlog()中首先從CPU的接收佇列上,然後呼叫netif_receive_skb()將SKB包傳遞到網路層,所以netif_receive_skb()函式就是skb包在網路核心層的最後一次處理。netif_receive_skb()的關鍵程式碼如下:
int netif_receive_skb(struct sk_buff *skb)
{
......
rcu_read_lock();
......
type = skb->protocol;
list_for_each_entry_rcu(ptype,
&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
if (ptype->type == type &&
(ptype->dev == null_or_orig || ptype->dev == skb->dev ||
ptype->dev == orig_dev)) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}
if (pt_prev) {
ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
} else {
......
}
out:
rcu_read_unlock();
return ret;
}
netif_receive_skb()中在呼叫三層的接收函式之前,需要呼叫rcu_read_lock()進入讀臨界區,這是第四個我們需要關注的上文環境。如果skb的三層協議型別是IP協議,則pt_prev->func()呼叫的就是ip_rcv()。
小結:經過上面的分析,有必要總結一下從軟中斷守護程序的處理函式ksoftirqd()到netif_receive_skb()中呼叫ip_rcv()將skb包傳遞到IP層時,ip_rcv()函式所處的上下文環境,下面的圖列出了所處理的環境(圖中只包含獲取鎖或禁止中斷等進入保護區的操作,釋放的操作相應地一一對應,不在圖中列出):
三、網路層網路層中主要關注IPv4協議,其接收函式時ip_rcv()。ip_rcv()中首先判斷skb包是否是傳送給本機,如果是傳送給其他機器,則直接丟棄,然後檢查IP資料包是否是正常的IP包,如果是正常的資料包,則呼叫ip_rcv_finish()繼續處理(忽略鉤子的處理),如下所示:
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
......
<span style="white-space:pre"> </span>return NF_HOOK(PF_INET, NF_INET_PRE_ROUTING, skb, dev, NULL,
ip_rcv_finish);
......
}
在ip_rcv()中沒有類似互斥鎖或中斷相關的同步操作,繼續看ip_rcv_finish()中的處理。ip_rcv_finish()中首先判斷skb包中是否設定路由快取,如果沒有設定,呼叫ip_route_input()來查詢路由項,然後呼叫dst_input()來處理skb包。在 ip_rcv_finish()函式中也沒有類似獲取鎖或中斷相關的同步操作,繼續看dst_input()函式。
dst_input()原始碼如下:
static inline int dst_input(struct sk_buff *skb)
{
return skb_dst(skb)->input(skb);
}
其中skb_dst(skb)是獲取skb的路由快取項,如果資料包是傳送到本地,input介面會設定為ip_local_deliver();如果需要轉發,則設定的是ip_forward()。因為要研究的是傳送到傳輸層時的上下文,因此假設這裡設定的ip_local_deliver()。
ip_local_deliver()首先檢查是否需要組裝分片,如果需要組裝分片,則呼叫ip_defrag()來重新組合各個分片,最後經過鉤子處理後,呼叫ip_local_deliver_finish()來將skb包傳遞到傳輸層,如下所示:
int ip_local_deliver(struct sk_buff *skb)
{
/*
* Reassemble IP fragments.
*/
if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) {
if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}
return NF_HOOK(PF_INET, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
ip_local_deliver_finish);
}
ip_local_deliver()中同樣沒有使用核心中的同步手段,也就是沒有我們關心的上下文環境(獲取鎖、開啟或禁止中斷等),接下來就剩下ip_local_deliver_finish()函數了。
ip_local_deliver_finish()中關鍵程式碼如下所示:
static int ip_local_deliver_finish(struct sk_buff *skb)
{
......
rcu_read_lock();
{
......
resubmit:
raw = raw_local_deliver(skb, protocol);
hash = protocol & (MAX_INET_PROTOS - 1);
ipprot = rcu_dereference(inet_protos[hash]);
if (ipprot != NULL) {
.......
ret = ipprot->handler(skb);
if (ret < 0) {
protocol = -ret;
goto resubmit;
}
IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
} else {
......
}
}
out:
rcu_read_unlock();
return 0;
}
ip_local_deliver_finish()首先根據IP包承載的報文協議型別找到對應的net_protocol例項,然後呼叫其handler介面。ip_local_deliver_finish()中有我們關心的上下文操作,也就是對rcu_read_lock()的呼叫。如果是TCP協議,則handler為tcp_v4_rcv()。tcp_v4_rcv()是TCP協議的接收函式,該函式被呼叫時的上下文就是我們一直在探究的向TCP層傳送資料包時的上下文。至此,我們完成了從網路驅動到向TCP傳輸資料包的過程的梳理,及tcp_v4_rcv()執行時的上下文。
四、總結
tcp_v4_rcv()執行時的上下文,就是在圖1-1中所示的ip_rcv()的執行上下文中再新增上rcu_read_lock()(ip_local_deliver_finish()中呼叫)的處理,上面已經說得很清楚了,就不再畫圖了。
最後把從網絡卡驅動到TCP層的接收處理的流程列出來,如下圖所示: