1. 程式人生 > >SKB包的接收,從網絡卡驅動到TCP層的處理流程

SKB包的接收,從網絡卡驅動到TCP層的處理流程

在開發模組過程中,遇到一個問題:在NF_INET_LOCAL_IN鉤子處截獲資料包後,如果操作失敗,還要把這些截獲的資料包重新傳遞到TCP層處理。但是這個操作是在核心執行緒中完成,不知道會不會對正常的資料包接收過程產生影響?因此,需要知道資料包在從網路層傳遞到傳輸層時的上下文環境(指的是是否禁止核心搶佔、是否需要獲取鎖等)。為了解決這個問題,決定將資料包的接收過程從驅動程式到TCP層的處理流程梳理了一遍。

  在文中的敘述過程中,將網絡卡驅動和網路層之間的部分,稱之為網路核心層,如下圖所示:


一、驅動程式

  為了找到skb包傳遞到傳輸層的上下文,肯定要從資料包接收的下半部,也就是資料接收的軟中斷中去找,但是既然要梳理,就要梳理的徹底一點,確保沒有遺漏,因此從網絡卡的驅動程式開始。每個網絡卡都會有一箇中斷號,驅動程式中會有一個對應的中斷處理函式。當資料包到達時,網絡卡會向CPU傳送一箇中斷,然後會呼叫特定於網路的驅動程式,來接收資料包。選擇的驅動程式是3c501網絡卡的驅動,該驅動比較簡單,便於看出從驅動程式傳遞到網路核心層傳輸的過程。3c501網絡卡對應的中斷處理函式el_interrupt(),原始碼如下(只列出關鍵的部分):

static irqreturn_t el_interrupt(int irq, void *dev_id)
{
	struct net_device *dev = dev_id;
	struct net_local *lp;
	int ioaddr;
	int axsr;			/* Aux. status reg. */

	ioaddr = dev->base_addr;
	lp = netdev_priv(dev);

	spin_lock(&lp->lock); 
    ......

	if (lp->txing) {
	
		......
		
	} else {
		/*
		 *	In receive mode.
		 */

		int rxsr = inb(RX_STATUS);
		
		.......
		
		if (rxsr & RX_MISSED)
			dev->stats.rx_missed_errors++;
		else if (rxsr & RX_RUNT) {
			/* Handled to avoid board lock-up. */
			dev->stats.rx_length_errors++;
			if (el_debug > 5)
				pr_debug("%s: runt.\n", dev->name);
		} else if (rxsr & RX_GOOD) {
			/*
			 *	Receive worked.
			 */
			el_receive(dev);
		} else {
			/*
			 *	Nothing?  Something is broken!
			 */
			if (el_debug > 2)
				pr_debug("%s: No packet seen, rxsr=%02x **resetting 3c501***\n",
					dev->name, rxsr);
			el_reset(dev);
		}
	}

	/*
	 *	Move into receive mode
	 */

	outb(AX_RX, AX_CMD);
	outw(0x00, RX_BUF_CLR);
	inb(RX_STATUS);		/* Be certain that interrupts are cleared. */
	inb(TX_STATUS);
	spin_unlock(&lp->lock);
out:
	return IRQ_HANDLED;
}

中斷處理中呼叫inb()來獲取當前中斷的結果,如果是RX_GOOD,則呼叫el_receive()(3c501的接收函式)來處理接收資料的工作。從el_interrupt()中可以看出el_receive()返回後,驅動程式中對中斷的處理已經基本完成。因此,要繼續從el_receive()函式中去找前面提出的問題的答案。

  el_receive()中的關鍵程式碼及分析如下:

static void el_receive(struct net_device *dev)
{
    ......
    
	outb(AX_SYS, AX_CMD);
	skb = dev_alloc_skb(pkt_len+2);

	/*
	 *	Start of frame
	 */

	outw(0x00, GP_LOW);
	if (skb == NULL) {
		pr_info("%s: Memory squeeze, dropping packet.\n", dev->name);
		dev->stats.rx_dropped++;
		return;
	} else {
		skb_reserve(skb, 2);	/* Force 16 byte alignment */
		/*
		 *	The read increments through the bytes. The interrupt
		 *	handler will fix the pointer when it returns to
		 *	receive mode.
		 */
		insb(DATAPORT, skb_put(skb, pkt_len), pkt_len);
		/*
		 * 呼叫eth_type_trans()函式來獲取資料幀承載的
		 * 報文型別,並且將skb包中的資料起始位置
		 * 移到資料幀中報文的起始位置。如果
		 * 承載的是IP報文,則此時data指向的是IP首部的
		 * 地址。
		 */
		skb->protocol = eth_type_trans(skb, dev);
		/*
		 * 呼叫netif_rx()將接收的資料包傳遞到
		 * 網路核心層。
		 */
		netif_rx(skb);
		dev->stats.rx_packets++;
		dev->stats.rx_bytes += pkt_len;
	}
	return;
}

el_receive()首先分配一個sk_buff緩衝區,然後從網絡卡中拷貝資料,之後呼叫netif_rx()將skb包傳遞到網路核心層,至此網絡卡驅動中所做的工作已經完成了。也就是說,當netif_rx()返回後,資料包接收的上半部,也就完成了。從這裡開始我們就要開始進入網路核心層中的處理了。

小結:在網絡卡驅動的中斷處理函式中,也就是資料接收的上半部中,不可能存在和向傳輸層傳遞資料包相關的上下文。但是既然要梳理整個流程,就要徹底一些,以免漏掉什麼東西。通過對驅動程式的研究,可以知道三層、四層中的skb是怎麼來的,資料包是怎麼從驅動程式傳遞到核心的協議棧中。當然還有skb中一些成員是如何設定的。

二、網路核心層

 從這裡開始,將更多的注意力放在處理過程中鎖的獲取、中斷的處理、以及核心搶佔等同步手段的處理上,找出向傳輸層傳遞資料包時的上下文環境,也就是呼叫tcp_v4_rcv()開始傳輸層處理時的上下文環境。

  在3c501的網絡卡驅動程式中,看到將skb包傳遞到上層是通過netif_rx()函式來完成,每個網絡卡驅動程式在接收到一個包後,都會呼叫該介面來傳遞到上層。接下來看看這個介面的實現,原始碼及分析如下:

/**
 *	netif_rx	-	post buffer to the network code
 *	@skb: buffer to post
 *
 *	This function receives a packet from a device driver and queues it for
 *	the upper (protocol) levels to process.  It always succeeds. The buffer
 *	may be dropped during processing for congestion control or by the
 *	protocol layers.
 *
 *	return values:
 *	NET_RX_SUCCESS	(no congestion)
 *	NET_RX_DROP     (packet was dropped)
 *
 */
/*
 * 資料到來時,會產生中斷,首先執行特定網絡卡的中斷
 * 處理程式,然後再執行接收函式分配新的套接字緩衝
 * 區,然後通過呼叫netif_rx來講資料傳到上層
 * 呼叫該函式標誌著控制由特定於網絡卡的程式碼轉移到了
 * 網路層的通用介面部分。該函式的作用是,將接收到
 * 的分組放置到一個特定於CPU的等待佇列上,並退出中
 * 斷上下文,使得CPU可以執行其他任務
 */
int netif_rx(struct sk_buff *skb)
{
	struct softnet_data *queue;
	unsigned long flags;

	/* if netpoll wants it, pretend we never saw it */
	if (netpoll_rx(skb))
		return NET_RX_DROP;

	/*
	 * 如果沒有設定資料包到達的時間,
	 * 則獲取當前的時鐘時間設定到tstamp上
	 */
	if (!skb->tstamp.tv64)
		net_timestamp(skb);

	/*
	 * The code is rearranged so that the path is the most
	 * short when CPU is congested, but is still operating.
	 */
	local_irq_save(flags);
	/*
	 * 每個CPU都有一個softnet_data型別變數,
	 * 用來管理進出分組的等待佇列
	 */
	queue = &__get_cpu_var(softnet_data);

	/*
	 * 記錄當前CPU上接收的資料包的個數
	 */
	__get_cpu_var(netdev_rx_stat).total++;
	if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
		/*
		 * 如果接受佇列不為空,說明當前正在處理資料包,
		 * 則不需要觸發
		 * 軟中斷操作,直接將資料包放到接收
		 * 佇列中,待前面的資料包處理之後,會立即處理
		 * 當前的資料包。
		 */
		if (queue->input_pkt_queue.qlen) {
enqueue:
			__skb_queue_tail(&queue->input_pkt_queue, skb);
			local_irq_restore(flags);
			/*
			 * 至此,中斷處理,也就是資料包接收的上半部已經
			 * 已經基本處理完成,剩下的工作交給軟中斷來處理。
			 */
			return NET_RX_SUCCESS;
		}

		/*
		 * 如果NAPI程式尚未執行,則重新排程使其開始
		 * 輪詢,並觸發NET_RX_SOFTIRQ軟中斷。
		 */
		napi_schedule(&queue->backlog);
		goto enqueue;
	}

       /*
	 * 如果當前CPU的接受佇列已滿,則丟棄資料包。
	 * 並記錄當前CPU上丟棄的資料包個數
	 */
	__get_cpu_var(netdev_rx_stat).dropped++;
	local_irq_restore(flags);

	kfree_skb(skb);
	return NET_RX_DROP;
}

每個CPU都有一個管理進出分組的softnet_data結構的例項,netif_rx()將skb包放在當前CPU的接收佇列中,然後呼叫napi_schedule()來將裝置放置在NAPI的輪詢佇列中,並觸發NET_RX_SOFTIRQ軟中斷來進行資料包接收的下半部的處理,也就是在這個過程中找到在向傳輸層傳遞資料包時的上下文。

  NET_RX_SOFTIRQ軟中斷對應的處理函式是net_rx_action(),參見net_dev_init()。在解決開始時提到的問題之前,需要先找到軟中斷的處理函式被呼叫時的上下文。

有幾種方法可開啟軟中斷處理,但這些都歸結為呼叫do_softirq()函式。其中一種方式就是在軟中斷守護程序(每個CPU都會有一個守護程序)中呼叫,就以此種方式為切入點來探究。軟中斷守護程序的處理函式時ksoftirqd(),其關鍵的程式碼如下:

static int ksoftirqd(void * __bind_cpu)
{
    ......

	while (!kthread_should_stop()) {
		preempt_disable();
		
		......

		while (local_softirq_pending()) {
		    
		    ......
		    
			do_softirq();
			preempt_enable_no_resched();
			cond_resched();
			preempt_disable();
			
			......
		}
		preempt_enable();
		......
	}
	
    ......
}

static int ksoftirqd(void * __bind_cpu)
{
    ......

	while (!kthread_should_stop()) {
		preempt_disable();
		
		......

		while (local_softirq_pending()) {
		    
		    ......
		    
			do_softirq();
			preempt_enable_no_resched();
			cond_resched();
			preempt_disable();
			
			......
		}
		preempt_enable();
		......
	}
	
    ......
}

從這裡可以看出,在開始處理軟中斷之前,要先呼叫preempt_disable()來禁止核心搶佔,這是我們找到的一個需要關注的上下文環境,也就是在協議層中接收資料時,首先要作的就是禁止核心搶佔(當然是不是這樣,還要看後面的處理,這裡姑且這麼認為吧)。

接下來看do_softirq()中的處理,原始碼如下:

asmlinkage void do_softirq(void)
{
	__u32 pending;
	unsigned long flags;

	/*
	 * 確認當前不處於中斷上下文中(當然,即不涉及
	 * 硬體中斷)。如果處於中斷上下文,則立即結束。
	 * 因為軟中斷用於執行ISR中非時間關鍵部分,所以
	 * 其程式碼本身一定不能在中斷處理程式內呼叫。
	 */
	if (in_interrupt())
		return;

	local_irq_save(flags);

        /*
         *  確定當前CPU軟中斷點陣圖中所有置位的位元位。
         */
	pending = local_softirq_pending();

	/*
	 * 如果有軟中斷等待處理,則呼叫__do_softirq()。
	 */
	if (pending)
		__do_softirq();

	local_irq_restore(flags);
}

在開始呼叫__do_softirq()作進一步的處理之前,要先呼叫local_irq_save()遮蔽所有中斷,並且儲存當前的中斷狀態,這是第二個我們需要關注的上下文環境。接下來看__do_softirq(),關鍵程式碼如下:

asmlinkage void __do_softirq(void)
{
    ......
	__local_bh_disable((unsigned long)__builtin_return_address(0));	
	......
	local_irq_enable();

	h = softirq_vec;

	do {
		if (pending & 1) {
			......
			h->action(h);
			......
		}
		h++;
		pending >>= 1;
	} while (pending);

	local_irq_disable();
	......
	_local_bh_enable();
}

__do_softirq()在呼叫軟中斷對應的action之前,會先呼叫__local_bh_disable()來和其他下半部操作互斥,然後呼叫local_irq_enable()來啟用中斷(注意這個操作和local_irq_restore()不一樣),這是第三個我們需要關注的上文環境。在__do_softirq()中會呼叫到NET_RX_SOFTIRQ軟中斷對應的

處理函式net_rx_action()。接下來看net_rx_action()函式的處理

static void net_rx_action(struct softirq_action *h)
{
    ......
	local_irq_disable();

	while (!list_empty(list)) {
	    ......
		local_irq_enable();
		......
		
		if (test_bit(NAPI_STATE_SCHED, &n->state)) {
			work = n->poll(n, weight);
			trace_napi_poll(n);
		}
        
        ......
		local_irq_disable();
		......
	}
out:
	local_irq_enable();
	......
}

net_rx_action()中進入迴圈前呼叫local_irq_disable()來關閉軟中斷,但是在呼叫NAPI輪詢佇列上裝置的poll函式前又呼叫local_irq_enable()來開啟中斷,因此在net_rx_action()中poll函式的執行上下文(指鎖、中斷等同步手段的環境)中和__do_softirq()中保持一致,沒有發生變化。接下來要關注的是net_rx_action()中呼叫poll介面,預設情況下呼叫的函式是process_backlog(),原始碼如下:

static int process_backlog(struct napi_struct *napi, int quota)
{
	int work = 0;
	struct softnet_data *queue = &__get_cpu_var(softnet_data);
	unsigned long start_time = jiffies;

	napi->weight = weight_p;
	do {
		struct sk_buff *skb;

		local_irq_disable();
              /*
               * 從當前CPU的接收佇列中取出一個SKB包。
               */
		skb = __skb_dequeue(&queue->input_pkt_queue);
		/*
		 * 如果所有的資料已處理完成,則呼叫
		 * __napi_complete()來將當前裝置移除輪詢佇列。
		 */
		if (!skb) {
			__napi_complete(napi);
			local_irq_enable();
			break;
		}
		local_irq_enable();

		netif_receive_skb(skb);
	/*
	 * 如果當前的處理次數小於裝置的權重,並且
	 * 處理時間不超過1個jiffies時間(如果HZ為1000,則
	 * 相當於是1毫秒),則處理下一個SKB包。
	 */
	} while (++work < quota && jiffies == start_time);

	return work;
}

process_backlog()中首先從CPU的接收佇列上,然後呼叫netif_receive_skb()將SKB包傳遞到網路層,所以netif_receive_skb()函式就是skb包在網路核心層的最後一次處理。netif_receive_skb()的關鍵程式碼如下:
int netif_receive_skb(struct sk_buff *skb)
{
    ......
	rcu_read_lock();
	......
	
	type = skb->protocol;
	list_for_each_entry_rcu(ptype,
			&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
		if (ptype->type == type &&
		    (ptype->dev == null_or_orig || ptype->dev == skb->dev ||
		     ptype->dev == orig_dev)) {
			if (pt_prev)
				ret = deliver_skb(skb, pt_prev, orig_dev);
			pt_prev = ptype;
		}
	}

	if (pt_prev) {
		ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
	} else {
		......
	}

out:
	rcu_read_unlock();
	return ret;
}

netif_receive_skb()中在呼叫三層的接收函式之前,需要呼叫rcu_read_lock()進入讀臨界區,這是第四個我們需要關注的上文環境。如果skb的三層協議型別是IP協議,則pt_prev->func()呼叫的就是ip_rcv()。

小結:經過上面的分析,有必要總結一下從軟中斷守護程序的處理函式ksoftirqd()到netif_receive_skb()中呼叫ip_rcv()將skb包傳遞到IP層時,ip_rcv()函式所處的上下文環境,下面的圖列出了所處理的環境(圖中只包含獲取鎖或禁止中斷等進入保護區的操作,釋放的操作相應地一一對應,不在圖中列出):

三、網路層
  網路層中主要關注IPv4協議,其接收函式時ip_rcv()。ip_rcv()中首先判斷skb包是否是傳送給本機,如果是傳送給其他機器,則直接丟棄,然後檢查IP資料包是否是正常的IP包,如果是正常的資料包,則呼叫ip_rcv_finish()繼續處理(忽略鉤子的處理),如下所示:
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev)
{
    ......
<span style="white-space:pre">	</span>return NF_HOOK(PF_INET, NF_INET_PRE_ROUTING, skb, dev, NULL,
		       ip_rcv_finish);  
    ......
}
在ip_rcv()中沒有類似互斥鎖或中斷相關的同步操作,繼續看ip_rcv_finish()中的處理。
  ip_rcv_finish()中首先判斷skb包中是否設定路由快取,如果沒有設定,呼叫ip_route_input()來查詢路由項,然後呼叫dst_input()來處理skb包。在 ip_rcv_finish()函式中也沒有類似獲取鎖或中斷相關的同步操作,繼續看dst_input()函式。

dst_input()原始碼如下:

static inline int dst_input(struct sk_buff *skb)
{
	return skb_dst(skb)->input(skb);
}

其中skb_dst(skb)是獲取skb的路由快取項,如果資料包是傳送到本地,input介面會設定為ip_local_deliver();如果需要轉發,則設定的是ip_forward()。因為要研究的是傳送到傳輸層時的上下文,因此假設這裡設定的ip_local_deliver()。

 ip_local_deliver()首先檢查是否需要組裝分片,如果需要組裝分片,則呼叫ip_defrag()來重新組合各個分片,最後經過鉤子處理後,呼叫ip_local_deliver_finish()來將skb包傳遞到傳輸層,如下所示:

int ip_local_deliver(struct sk_buff *skb)
{
	/*
	 *	Reassemble IP fragments.
	 */
	if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) {
		if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
			return 0;
	}

	return NF_HOOK(PF_INET, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
		       ip_local_deliver_finish);
}

ip_local_deliver()中同樣沒有使用核心中的同步手段,也就是沒有我們關心的上下文環境(獲取鎖、開啟或禁止中斷等),接下來就剩下ip_local_deliver_finish()函數了。

ip_local_deliver_finish()中關鍵程式碼如下所示:

static int ip_local_deliver_finish(struct sk_buff *skb)
{
    ......
	rcu_read_lock();
	{
        ......
	resubmit:
		raw = raw_local_deliver(skb, protocol);

		hash = protocol & (MAX_INET_PROTOS - 1);
		ipprot = rcu_dereference(inet_protos[hash]);
		if (ipprot != NULL) {
			.......
			
			ret = ipprot->handler(skb);
			if (ret < 0) {
				protocol = -ret;
				goto resubmit;
			}
			IP_INC_STATS_BH(net, IPSTATS_MIB_INDELIVERS);
		} else {
		    ......
		}
	}
 out:
	rcu_read_unlock();

	return 0;
}

ip_local_deliver_finish()首先根據IP包承載的報文協議型別找到對應的net_protocol例項,然後呼叫其handler介面。ip_local_deliver_finish()中有我們關心的上下文操作,也就是對rcu_read_lock()的呼叫。如果是TCP協議,則handler為tcp_v4_rcv()。tcp_v4_rcv()是TCP協議的接收函式,該函式被呼叫時的上下文就是我們一直在探究的向TCP層傳送資料包時的上下文。至此,我們完成了從網路驅動到向TCP傳輸資料包的過程的梳理,及tcp_v4_rcv()執行時的上下文。

四、總結
  tcp_v4_rcv()執行時的上下文,就是在圖1-1中所示的ip_rcv()的執行上下文中再新增上rcu_read_lock()(ip_local_deliver_finish()中呼叫)的處理,上面已經說得很清楚了,就不再畫圖了。

最後把從網絡卡驅動到TCP層的接收處理的流程列出來,如下圖所示: