1. 程式人生 > >裝置介面層之資料包傳送

裝置介面層之資料包傳送

這篇筆記記錄了裝置介面層傳送資料包的過程。這裡不會單獨列舉發送過程中使用到的一些資料結構,而是直接跟蹤程式碼,因為傳送過程中使用到的很多資料結構在接收部分的描述中已經介紹過了,這裡可以對比參考:裝置介面層之資料包接收.

1. 裝置介面層傳送介面

仔細看下dev_queue_xmit()的註釋很有必要。

/**
 *	dev_queue_xmit - transmit a buffer
 *	@skb: buffer to transmit
 *
 *	Queue a buffer for transmission to a network device. The caller must
 *	have set the device and priority and built the buffer before calling
 *	this function. The function can be called from an interrupt.
 *
 *	A negative errno code is returned on a failure. A success does not
 *	guarantee the frame will be transmitted as it may be dropped due
 *	to congestion or traffic shaping.
 *
 * -----------------------------------------------------------------------------------
 *      I notice this method can also return errors from the queue disciplines,
 *      including NET_XMIT_DROP, which is a positive value.  So, errors can also
 *      be positive.
 *
 *      Regardless of the return value, the skb is consumed, so it is currently
 *      difficult to retry a send to this method.  (You can bump the ref count
 *      before sending to hold a reference for retry if you are careful.)
 *
 *      When calling this method, interrupts MUST be enabled.  This is because
 *      the BH enable code must have IRQs enabled so that it will not deadlock.
 *          --BLG
 */
//經過層三路由,該資料包已經確定要從該裝置傳送出去,所以唯一的入參就是要傳送的資料包 int dev_queue_xmit(struct sk_buff *skb) { //dev指向要傳送該資料包的裝置 struct net_device *dev = skb->dev; struct Qdisc *q; int rc = -ENOMEM; /* GSO will handle the following emulations directly. */ //如果網路裝置本身可以處理GSO資料包,那麼後面可能的線性化、校驗和計算都可以由硬體執行 if (netif_needs_gso
(dev, skb)) goto gso; //資料包有分片但是硬體不支援自動處理分片,那麼需要軟體將資料包線性化(即將所有資料放 //到一個地址連續的快取區中) if (skb_shinfo(skb)->frag_list && !(dev->features & NETIF_F_FRAGLIST) && __skb_linearize(skb)) goto out_kfree_skb; /* Fragmented skb is linearized if device does not support SG, * or if at least one of fragments is in highmem and device * does not support DMA from it. */
//硬體不支援SG,或者不支援非連續的DMA(不懂),同樣要軟體進行線性化處理 if (skb_shinfo(skb)->nr_frags && (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) && __skb_linearize(skb)) goto out_kfree_skb; /* If packet is not checksummed and device does not support * checksumming for this protocol, complete checksumming here. */ //高層協議沒有進行完整的校驗和計算,那麼需要根據硬體是否支援自動校驗,呼叫 //skb_checksum_help()進行軟體校驗 if (skb->ip_summed == CHECKSUM_PARTIAL) { skb_set_transport_header(skb, skb->csum_start - skb_headroom(skb)); if (!(dev->features & NETIF_F_GEN_CSUM) && !((dev->features & NETIF_F_IP_CSUM) && skb->protocol == htons(ETH_P_IP)) && !((dev->features & NETIF_F_IPV6_CSUM) && skb->protocol == htons(ETH_P_IPV6))) if (skb_checksum_help(skb)) goto out_kfree_skb; } gso: spin_lock_prefetch(&dev->queue_lock); /* Disable soft irqs for various locks below. Also * stops preemption for RCU. */ rcu_read_lock_bh(); /* Updates of qdisc are serialized by queue_lock. * The struct Qdisc which is pointed to by qdisc is now a * rcu structure - it may be accessed without acquiring * a lock (but the structure may be stale.) The freeing of the * qdisc will be deferred until it's known that there are no * more references to it. * * If the qdisc has an enqueue function, we still need to * hold the queue_lock before calling it, since queue_lock * also serializes access to the device queue. */ //獲取到Qdisc q = rcu_dereference(dev->qdisc); #ifdef CONFIG_NET_CLS_ACT skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS); #endif //如果裝置定義了輸出排隊規則,說明支援Qos,需要使用流量控制機制進行資料包傳送 //實際上,從目前的程式碼實現上看,排隊規則是一定會設定的,層二框架總是會為裝置指定一個預設的排隊規則 if (q->enqueue) { //持有Qdisc的保護鎖 spin_lock(&dev->queue_lock); q = dev->qdisc; if (q->enqueue) { /* reset queue_mapping to zero */ //這步忽略 skb_set_queue_mapping(skb, 0); //把該資料包入佇列,具體何時傳送由流量控制機制決定 rc = q->enqueue(skb, q); //呼叫流控機制傳送。可以這麼理解這次傳送,這裡僅僅是一次嘗試,能成功固然是好,如果 //不滿足傳送條件(流控限速、佇列狀態不對等等),也沒有關係,還會由傳送軟中斷髮送 qdisc_run(dev); spin_unlock(&dev->queue_lock); //根據入佇列的結果返回是否傳送成功。再次強調,入佇列只是代表網路裝置接收了這包資料, //並不能說明該資料已經被髮送了,傳送時機以及傳送哪一包資料由流量控制機制決定 rc = rc == NET_XMIT_BYPASS ? NET_XMIT_SUCCESS : rc; goto out; } spin_unlock(&dev->queue_lock); } /* The device has no queue. Common case for software devices: loopback, all the sorts of tunnels... Really, it is unlikely that netif_tx_lock protection is necessary here. (f.e. loopback and IP tunnels are clean ignoring statistics counters.) However, it is possible, that they rely on protection made by us here. Check this and shot the lock. It is not prone from deadlocks. Either shot noqueue qdisc, it is even simpler 8) */ //這裡處理網路裝置沒有開啟流量控制的情形。這時只能嘗試直接傳送,如果傳送時機不對 //或者是別的其它導致沒有傳送成功,那麼只能丟棄資料包,因為裝置沒有佇列儲存資料 //裝置必須已經別開啟,即執行過dev_open() if (dev->flags & IFF_UP) { int cpu = smp_processor_id(); /* ok because BHs are off */ //dev_queue_xmit()函式在同一CPU上不能被遞迴呼叫 if (dev->xmit_lock_owner != cpu) { HARD_TX_LOCK(dev, cpu); //裝置狀態ok,可以傳送資料 if (!netif_queue_stopped(dev) && !netif_subqueue_stopped(dev, skb)) { rc = 0; //呼叫驅動提供的dev_hard_start_xmit()介面傳送資料,此外還處理GSO的情況 if (!dev_hard_start_xmit(skb, dev)) { HARD_TX_UNLOCK(dev); goto out; } } HARD_TX_UNLOCK(dev); if (net_ratelimit()) printk(KERN_CRIT "Virtual device %s asks to " "queue packet!\n", dev->name); } else { /* Recursion is detected! It is possible, * unfortunately */ if (net_ratelimit()) printk(KERN_CRIT "Dead loop on virtual device " "%s, fix it urgently!\n", dev->name); } } rc = -ENETDOWN; rcu_read_unlock_bh(); out_kfree_skb: kfree_skb(skb); return rc; out: rcu_read_unlock_bh(); return rc; }

從dev_queue_xmit()的實現來看,傳送過程根據裝置是否有佇列(即有流量控制機制)分為兩種情況,下面分別看這兩種情況的傳送過程。

1.1 兩種情況的分別

可以看到,區別使用哪一種傳送過程很簡單,就是根據dev->qdics中是否指定了入佇列函式enqueue(),如果指定了就是有佇列傳送,要使用流量控制機制;否則就是無佇列傳送。

1.2 RCU鎖

這裡還要注意的是在整個dev_queue_xmit()期間,使用了rcu_read_lock_bh()和rcu_read_unlock_bh()保護了對dev->qdics指標的訪問,確保在函式執行期間該指標不會被修改。修改這個指標的地方見其它筆記介紹。

2. 有佇列裝置傳送資料

這種情況下,首先呼叫qdisc->enqueue()將當前要傳送的資料包入佇列,然後呼叫qdisc_run()函式。enqueue()並不會真正傳送,而且該函式與流量控制機制強相關,不同的流控機制使用的入佇列函式不一樣,我們暫時不分析該函式的實現,這裡重點看下瑞佇列之後的qdisc_run()到底幹了些什麼。

2.1 qdisc_run()

static inline void qdisc_run(struct net_device *dev)
{
	//1. 裝置的傳送佇列開啟(__LINK_STATE_XOFF標誌位沒有設定)
	//2. 裝置沒有被其它CPU排程傳送(__LINK_STATE_QDISC_RUNNING標誌位沒有置位)
	//如果滿足上述兩個條件,那麼設定排程標記,然後呼叫__qdisc_run()
	if (!netif_queue_stopped(dev) &&
	    !test_and_set_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
		__qdisc_run(dev);
}

qdisc_run()只是一個包裹函式,判斷條件後呼叫__qdisc_run()。

void __qdisc_run(struct net_device *dev)
{
	unsigned long start_time = jiffies;

	while (qdisc_restart(dev)) {
		//如果裝置的傳送佇列被關閉了,結束本次傳送
		if (netif_queue_stopped(dev))
			break;

		/*
		 * Postpone processing if
		 * 1. another process needs the CPU;
		 * 2. we've been doing it for too long.
		 */
		//如註釋所述,這裡是為了避免傳送過程持續太長。這種場景下,
		//裝置還有資料要發,所以呼叫netif_schedule()啟用傳送軟中斷繼續處理
		if (need_resched() || jiffies != start_time) {
			netif_schedule(dev);
			break;
		}
	}
	//退出該函式時清除__LINK_STATE_QDISC_RUNNING標誌位
	clear_bit(__LINK_STATE_QDISC_RUNNING, &dev->state);
}

同樣的,核心在qdisc_restart()的實現上。

/*
 * NOTE: Called under dev->queue_lock with locally disabled BH.
 *
 * __LINK_STATE_QDISC_RUNNING guarantees only one CPU can process this
 * device at a time. dev->queue_lock serializes queue accesses for
 * this device AND dev->qdisc pointer itself.
 *
 *  netif_tx_lock serializes accesses to device driver.
 *
 *  dev->queue_lock and netif_tx_lock are mutually exclusive,
 *  if one is grabbed, another must be free.
 *
 * Note, that this procedure can be called by a watchdog timer
 *
 * Returns to the caller:
 *				0  - queue is empty or throttled.
 *				>0 - queue is not empty.
 *
 */
//仔細看註釋
static inline int qdisc_restart(struct net_device *dev)
{
	struct Qdisc *q = dev->qdisc;
	struct sk_buff *skb;
	int ret = NETDEV_TX_BUSY;

	//從傳送佇列中出佇列一個數據包
	if (unlikely((skb = dev_dequeue_skb(dev, q)) == NULL))
		return 0;

	//關於鎖見後面解釋
	spin_unlock(&dev->queue_lock);

	HARD_TX_LOCK(dev, smp_processor_id());
	//只有配置支援使用多個傳送佇列的時候該函式才可能返回非0值,這裡不考慮
	//呼叫dev_hard_start_xmit()使用驅動提供的傳送回調發送這包資料
	if (!netif_subqueue_stopped(dev, skb))
		ret = dev_hard_start_xmit(skb, dev);
	HARD_TX_UNLOCK(dev);

	spin_lock(&dev->queue_lock);
	q = dev->qdisc;
	//根據傳送返回結果進行處理
	switch (ret) {
	case NETDEV_TX_OK:
		//傳送成功,返回當前傳送佇列中剩餘資料包數目
		ret = qdisc_qlen(q);
		break;
	case NETDEV_TX_LOCKED:
		//裝置持鎖失敗,那麼進行CPU衝突相關處理。主要是進行統計,並將
		//資料包重新入佇列,然後返回佇列中剩餘資料包數目
		ret = handle_dev_cpu_collision(skb, dev, q);
		break;
	default:
		/* Driver returned NETDEV_TX_BUSY - requeue skb */
		if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
			printk(KERN_WARNING "BUG %s code %d qlen %d\n",
			       dev->name, ret, q->q.qlen);
		//其它情況都屬於本次傳送失敗,那麼將資料包重新入佇列,然後返回佇列中剩餘資料包數目
		ret = dev_requeue_skb(skb, dev, q);
		break;
	}
	return ret;
}

2.1.1 鎖

有佇列裝置傳送過程還涉及另外兩個鎖:dev_queue_lock和HARD_TX_LOCK().

自旋鎖dev_queue_lock用於控制同時只有一個流程在呼叫裝置的流控機制傳送,即保證入佇列、出佇列的操作序列化。

HARD_TX_LOCK()持有期間,要呼叫驅動提供的傳送回撥進行傳送,這個過程要操作硬體,相對來講比較耗時。為了保證:

1) 傳送過程必須序列化,不能同時有兩個流程呼叫驅動提供的傳送回撥;
2) 裝置驅動傳送過程中不能休眠;
3) 裝置驅動傳送期間,可以執行入佇列操作。

框架設計了這個HARD_TX_LOCK(),先來看看這個鎖的操作:

#define HARD_TX_LOCK(dev, cpu) {			\
	if ((dev->features & NETIF_F_LLTX) == 0) {	\
		__netif_tx_lock(dev, cpu);			\
	}						\
}

#define HARD_TX_UNLOCK(dev) {				\
	if ((dev->features & NETIF_F_LLTX) == 0) {	\
		netif_tx_unlock(dev);			\
	}						\
}

/**
 *	netif_tx_lock - grab network device transmit lock
 *	@dev: network device
 *	@cpu: cpu number of lock owner
 *
 * Get network device transmit lock
 */
static inline void __netif_tx_lock(struct net_device *dev, int cpu)
{
	spin_lock(&dev->_xmit_lock);
	dev->xmit_lock_owner = cpu;
}

static inline void netif_tx_unlock(struct net_device *dev)
{
	dev->xmit_lock_owner = -1;
	spin_unlock(&dev->_xmit_lock);
}

如果驅動程式在dev->feature欄位中設定了NETIF_F_LLTX標記,表示驅動自己會實現這種鎖,這時驅動會保證在提供的傳送回撥內部實現時,正確的持有該鎖。

如果驅動沒有設定NETIF_F_LLTX標記,那麼就是用net_device中提供的自旋鎖__xmit_lock,持有該鎖時,將dev->xmit_lock_owner為當前持鎖的CPU的ID。

需要強調下這種設計為什麼能夠實現上面列舉的三點:首先自旋鎖(無論是dev->_xmit_lock還是驅動自己提供的)可以保證1和2;因為在HARD_TX_LOCK()之前已經釋放了dev->dev_queue_lock,所以可以執行入佇列操作,但是因為沒有清除標記__LINK_STATE_QDISC_RUNNING標誌位,所以其它CPU根本無法呼叫qdisc_run()啟動流控機制來執行出佇列操作。

2.2 dev_hard_start_xmit()

看上面程式碼,可以看到最終的傳送是dev_hard_start_xmit(),該函式處理GSO、支援勾包、呼叫驅動的傳送回調發送資料包。

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev)
{
	if (likely(!skb->next)) {
		//這裡提供了一種在裝置介面層的勾包機制,向註冊在
		//ptype_all中的所有介面到傳送一份資料包
		if (!list_empty(&ptype_all))
			dev_queue_xmit_nit(skb, dev);

		if (netif_needs_gso(dev, skb)) {
			if (unlikely(dev_gso_segment(skb)))
				goto out_kfree_skb;
			if (skb->next)
				goto gso;
		}
		//呼叫驅動的傳送回調發送資料包
		return dev->hard_start_xmit(skb, dev);
	}

gso:
	//處理GSO情形
	do {
		struct sk_buff *nskb = skb->next;
		int rc;

		skb->next = nskb->next;
		nskb->next = NULL;
		//呼叫驅動的傳送回撥分別傳送每一片
		rc = dev->hard_start_xmit(nskb, dev);
		if (unlikely(rc)) {
			nskb->next = skb->next;
			skb->next = nskb;
			return rc;
		}
		if (unlikely((netif_queue_stopped(dev) ||
			     netif_subqueue_stopped(dev, skb)) &&
			     skb->next))
			return NETDEV_TX_BUSY;
	} while (skb->next);

	skb->destructor = DEV_GSO_CB(skb)->destructor;

out_kfree_skb:
	kfree_skb(skb);
	return 0;
}

2.3 啟用傳送軟中斷

在__qdisc_run()中看到,如果裝置還有資料要傳送,那麼就會呼叫netif_schedule(),看下這個過程的實現:

static inline void netif_schedule(struct net_device *dev)
{
	//傳送佇列沒有關閉的情況下執行排程
	if (!test_bit(__LINK_STATE_XOFF, &dev->state))
		__netif_schedule(dev);
}

void __netif_schedule(struct net_device *dev)
{
	//如果__LINK_STATE_SCHED沒有置位,說明裝置還沒有被髮送軟中斷排程,
	//那麼設定標誌位,然後將裝置放入當前CPU的輸出佇列中並激活軟中斷
	if (!test_and_set_bit(__LINK_STATE_SCHED, &dev->state)) {
		unsigned long flags;
		struct softnet_data *sd;

		local_irq_save(flags);
		//獲取當前CPU的收發佇列,將裝置接入該佇列
		sd = &__get_cpu_var(softnet_data);
		dev->next_sched = sd->output_queue;
		sd->output_queue = dev;
		//啟用傳送軟中斷
		raise_softirq_irqoff(NET_TX_SOFTIRQ);
		local_irq_restore(flags);
	}
}
EXPORT_SYMBOL(__netif_schedule);

這裡,output_queue的使用和接收過程中poll_list的使用完全相同。

3. 無佇列裝置傳送資料

看完有佇列傳送資料,再回頭看看dev_queue_xmit()中無佇列情況的處理,會發現所有的細節都已經涉及到,這裡不再贅述。

4. 傳送軟中斷net_tx_action()

從上面的程式碼中看到,當dev_queue_xmit()沒有將佇列中的資料傳送完畢時,就會啟用傳送軟中斷繼續傳送。

實際上,大多數的資料傳送都是由傳送軟中斷驅動的,如果一次傳送不完,會再次啟用,在下一次軟中斷處理中繼續傳送,直到所有資料都發送完畢。

static void net_tx_action(struct softirq_action *h)
{
	//獲取本CPU上面的傳送佇列
	struct softnet_data *sd = &__get_cpu_var(softnet_data);

	//completion_queue佇列的目的是讓驅動程式將資料傳送完畢後,如果想要快速退出,
	//延後執行資料包的free動作,那麼可以將待釋放的資料包放入該佇列,將釋放動作放到
	//傳送軟中斷中處理

	if (sd->completion_queue) {
		struct sk_buff *clist;

		//注意這種優化寫法:先關中斷,把資料從佇列中摘下來,再開中斷,由於free動作比較耗時,
		//而該佇列是所有裝置共享的,這種設計可以保證關中斷的時間儘可能的短
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();

		//釋放這些SKB
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
			//注意,放入完成佇列的SKB的引用計數應該已經不為0
			BUG_TRAP(!atomic_read(&skb->users));
			__kfree_skb(skb);
		}
	}

	//處理output_queue,嘗試傳送
	if (sd->output_queue) {
		struct net_device *head;

		//和上面completetion_queue類似的處理辦法,只是這裡的佇列元素是net_device
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		local_irq_enable();

		while (head) {
			struct net_device *dev = head;
			//head指向待輪詢列表的下一個裝置
			head = head->next_sched;

			smp_mb__before_clear_bit();
			//清除當前裝置的正在輪詢狀態
			clear_bit(__LINK_STATE_SCHED, &dev->state);
			//嘗試通過流量控制機制傳送,這裡之所以能肯定使用了流量控制機制,是因為如果不使用,
			//那麼在dev_queue_xmit()中就直接呼叫驅動的傳送介面傳送了,根本不會啟用傳送軟中斷
			//另外,注意這裡為了儘可能的塊,使用的是trylock()版本的持鎖方式,也表達了這裡只是
			//盡最大努力傳送的意思,如果本次不能傳送,那麼就再次排程,等待下次軟中斷機會
			if (spin_trylock(&dev->queue_lock)) {
				qdisc_run(dev);
				spin_unlock(&dev->queue_lock);
			} else {
				//本次未進行傳送,再次將該裝置加入到output_queue,並激活傳送軟中斷
				netif_schedule(dev);
			}
		}
	}
}

關於傳送佇列的描述,可以參考裝置介面層之資料包接收.

5. 小結

下面來回顧下整個裝置介面層的整個資料包傳送過程:

  1. 高層協議將資料包準備好後,呼叫dev_queue_xmit()將資料包傳送給裝置介面層,這裡面有個非常重要的前提是高層協議已經確定裝置要從哪一個網路裝置發出(設定了skb->dev欄位);
  2. 在dev_queue_xmit()中處理線性化、校驗和等處理後,根據裝置上是否配置了傳送佇列,將傳送過程分為兩種:有佇列傳送和無佇列傳送,下面我們在只看有佇列傳送;
  3. 對於有佇列傳送,先將資料包入佇列,然後呼叫qdisc_run()啟動流量控制機制進行一次資料包傳送嘗試,如果傳送失敗,會將該裝置加入到當前CPU的傳送輪詢佇列output_queue中,然後啟用網路傳送軟中斷;
  4. 在網路傳送軟中斷中,依次處理髮送輪詢佇列ouput_queue中的待發送裝置,處理時同樣是啟動流量控制機制進行資料包傳送,如果一次軟中斷中無法傳送完畢,那麼就繼續啟動下一次軟中斷,如此反覆,知道該裝置的傳送佇列情況為止。