linux核心資料包轉發流程(三):網絡卡幀接收分析
阿新 • • 發佈:2019-01-01
每個cpu都有佇列來處理接收到的幀,都有其資料結構來處理入口和出口流量,因此,不同cpu之間沒有必要使用上鎖機制,。此佇列資料結構為softnet_data(定義在include/linux/netdevice.h中):
/* * Incoming packets are placed on per-cpu queues so that * no locking is needed. */ struct softnet_data { struct Qdisc *output_queue; struct sk_buff_headinput_pkt_queue;//有資料要傳輸的裝置列表 struct list_headpoll_list; //雙向連結串列,其中的裝置有輸入幀等著被處理。 struct sk_buff*completion_queue;//緩衝區列表,其中緩衝區已成功傳輸,可以釋放掉 struct napi_structbacklog; };
此結構欄位可用於傳輸和接收。換而言之,NET_RX_SOFTIRQ和NET_TX_SOFTIRQ軟IRQ都引用此結構。入口幀會排入input_pkt_queue(NAPI有所不同)。
softnet_data是在net_dev_init函式中初始化的:非NAPI裝置驅動會為其所接收的每一個幀產生一箇中斷事件,在高流量負載下,會花掉大量時間處理中斷事件,造成資源浪費。而NAPI驅動混合了中斷事件和輪詢,在高流量負載下其效能會比舊方法要好。/* * This is called single threaded during boot, so no need * to take the rtnl semaphore. */ static int __init net_dev_init(void) { int i, rc = -ENOMEM; ...... /* * Initialise the packet receive queues. */ for_each_possible_cpu(i) { struct softnet_data *queue; queue = &per_cpu(softnet_data, i); skb_queue_head_init(&queue->input_pkt_queue); queue->completion_queue = NULL; INIT_LIST_HEAD(&queue->poll_list); queue->backlog.poll = process_backlog; queue->backlog.weight = weight_p; queue->backlog.gro_list = NULL; queue->backlog.gro_count = 0; } ...... open_softirq(NET_TX_SOFTIRQ, net_tx_action); open_softirq(NET_RX_SOFTIRQ, net_rx_action); ...... }
NAPI主要思想是混合使用中斷事件和輪詢,而不是僅僅使用中斷事件驅動模型。當收到新的幀時,關中斷,再一次處理完所有入口佇列。從核心觀點來看,NAPI方法因為中斷事件少了,減少了cpu負載。
使用非NAPI的驅動程式的xx_rx()函式一般如下:
第一步是分配一個快取區來儲存報文。 注意快取分配函式 (dev_alloc_skb) 需要知道資料長度。void xx_rx() { struct sk_buff *skb; skb = dev_alloc_skb(pkt_len + 5); if (skb != NULL) { skb_reserve(skb, 2);/* Align IP on 16 byte boundaries */ /*memcpy(skb_put(skb, 2), pkt, pkt_len);*/ //copy data to skb skb->protocol = eth_type_trans(skb, dev); netif_rx(skb); } }
第二步將報文資料被拷貝到快取區; skb_put 函式更新快取中的資料末尾指標並返回指向新建空間的指標。
第三步提取協議標識及獲取其他資訊。
最後呼叫netif_rx(skb)做進一步處理,該函式一般定義在net/core/dev.c中。
int netif_rx(struct sk_buff *skb)
{
struct softnet_data *queue;
unsigned long flags;
/* if netpoll wants it, pretend we never saw it */
if (netpoll_rx(skb))
return NET_RX_DROP;
if (!skb->tstamp.tv64)
net_timestamp(skb);
/*
* The code is rearranged so that the path is the most
* short when CPU is congested, but is still operating.
*/
local_irq_save(flags);
queue = &__get_cpu_var(softnet_data);
__get_cpu_var(netdev_rx_stat).total++;
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {//是否還有空間,netdev_max_backlog一般為300
//只有當新緩衝區為空時,才會觸發軟中斷(napi_schedule()),如果緩衝區不為空,軟中斷已被觸發,沒有必要再去觸發一次。
if (queue->input_pkt_queue.qlen) {
enqueue:
__skb_queue_tail(&queue->input_pkt_queue, skb);//這裡是關鍵之處,將skb加入input_pkt_queue之中。
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
napi_schedule(&queue->backlog);//觸發軟中斷
goto enqueue;
}
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);
kfree_skb(skb);
return NET_RX_DROP;
}
EXPORT_SYMBOL(netif_rx);
static inline void napi_schedule(struct napi_struct *n)
{
if (napi_schedule_prep(n))
__napi_schedule(n);
}
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);//將該裝置加入輪詢連結串列,等待該裝置的幀被處理
__raise_softirq_irqoff(NET_RX_SOFTIRQ);//最終觸發軟中斷
local_irq_restore(flags);
}
EXPORT_SYMBOL(__napi_schedule);
至此中斷的上半部完成,其他的工作交由下半部來實現。napi_schedule(&queue->backlog)函式將有等待的接收資料包的NIC鏈入softnet_data的poll_list佇列,然後觸發軟中斷,讓下半部去完成資料的處理工作。
而是用NAPI裝置的接受資料時直接觸發軟中斷,不需要通過netif_rx()函式設定好接收佇列再觸發軟中斷。比如e100硬中斷處理函式為:
static irqreturn_t e100_intr(int irq, void *dev_id)
{
struct net_device *netdev = dev_id;
struct nic *nic = netdev_priv(netdev);
u8 stat_ack = ioread8(&nic->csr->scb.stat_ack);
DPRINTK(INTR, DEBUG, "stat_ack = 0x%02X\n", stat_ack);
if (stat_ack == stat_ack_not_ours || /* Not our interrupt */
stat_ack == stat_ack_not_present) /* Hardware is ejected */
return IRQ_NONE;
/* Ack interrupt(s) */
iowrite8(stat_ack, &nic->csr->scb.stat_ack);
/* We hit Receive No Resource (RNR); restart RU after cleaning */
if (stat_ack & stat_ack_rnr)
nic->ru_running = RU_SUSPENDED;
if (likely(napi_schedule_prep(&nic->napi))) {
e100_disable_irq(nic);
__napi_schedule(&nic->napi);//此處觸發軟中斷
}
return IRQ_HANDLED;
}
在前面我們已經知道在net_dev_init()函式中註冊了收報軟中斷函式net_rx_action(),當軟中斷被觸發之後,該函式將被呼叫。net_rx_action()函式為:
static void net_rx_action(struct softirq_action *h)
{
struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget;
void *have;
local_irq_disable();
while (!list_empty(list)) {
struct napi_struct *n;
int work, weight;
/* If softirq window is exhuasted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))//入口佇列仍然有緩衝區,軟IRQ再度被排程執行。
goto softnet_break;
local_irq_enable();
/* Even though interrupts have been re-enabled, this
* access is safe because interrupts can only add new
* entries to the tail of this list, and only ->poll()
* calls can remove this head entry from the list.
*/
n = list_entry(list->next, struct napi_struct, poll_list);
have = netpoll_poll_lock(n);
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidently calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);//執行poll函式,返回已處理的幀
trace_napi_poll(n);
}
WARN_ON_ONCE(work > weight);
budget -= work;
local_irq_disable();
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(work == weight)) {//佇列被清空。呼叫napi_complete()負責此事。
if (unlikely(napi_disable_pending(n))) {
local_irq_enable();
napi_complete(n);
local_irq_disable();
} else
list_move_tail(&n->poll_list, list);
}
netpoll_poll_unlock(have);
}
out:
local_irq_enable();
#ifdef CONFIG_NET_DMA
/*
* There may not be any more sk_buffs coming right now, so push
* any pending DMA copies to hardware
*/
dma_issue_pending_all();
#endif
return;
softnet_break:
__get_cpu_var(netdev_rx_stat).time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
}
由上可見,下半部的主要工作是遍歷有資料幀等待接收的裝置連結串列,對於每個裝置,執行它相應的poll函式。對非NAPI裝置來說,poll函式在net_dev_init()函式中初始化為process_backlog()。
process_backlog()函式定義為:
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
napi->weight = weight_p;
do {
struct sk_buff *skb;
local_irq_disable();
skb = __skb_dequeue(&queue->input_pkt_queue);
if (!skb) {
__napi_complete(napi);
local_irq_enable();
break;
}
local_irq_enable();
netif_receive_skb(skb);
} while (++work < quota && jiffies == start_time);
return work;
}
對NAPI裝置來的說,驅動程式必須提供一個poll方法,poll 方法有下面原型:
int (*poll)(struct napi_struct *dev, int *budget);
在初始化時需要新增該方法:
netif_napi_add(netdev, &nic->napi, xx_poll, XX_NAPI_WEIGHT);
NAPI驅動 的 poll 方法實現一般如下(借用《Linux裝置驅動程式》中程式碼,核心有點沒對上,懶得去寫了):
static int xx_poll(struct net_device *dev, int *budget)
{
int npackets = 0, quota = min(dev->quota, *budget);
struct sk_buff *skb;
struct xx_priv *priv = netdev_priv(dev);
struct xx_packet *pkt;
while (npackets < quota && priv->rx_queue) {
pkt = xx_dequeue_buf(dev);
skb = dev_alloc_skb(pkt->datalen + 2);
if (! skb) {
if (printk_ratelimit())
printk(KERN_NOTICE "xx: packet dropped\n"); priv->stats.rx_dropped++; xx_release_buffer(pkt); continue;
}
memcpy(skb_put(skb, pkt->datalen), pkt->data, pkt->datalen);
skb->dev = dev;
skb->protocol = eth_type_trans(skb, dev);
skb->ip_summed = CHECKSUM_UNNECESSARY; /* don't check it */
netif_receive_skb(skb);
/* Maintain stats */
npackets++;
priv->stats.rx_packets++;
priv->stats.rx_bytes += pkt->datalen;
xx_release_buffer(pkt);
}
/* If we processed all packets, we're done; tell the kernel and reenable ints */
*budget -= npackets;
dev->quota -= npackets;
if (! priv->rx_queue) {
netif_rx_complete(dev);
xx_rx_ints(dev, 1);
return 0;
}
/* We couldn't process everything. */
return 1;
}
NAPI驅動提供自己的poll函式和私有佇列。
不管是非NAPI或NAPI,他們的poll函式最後都會呼叫netif_receive_skb(skb)來處理接收到的幀。該函式會想各個已註冊的協議例程傳送一個skb,之後資料進入Linux核心協議棧處理。