裝置介面層之流量控制
在裝置介面層之資料包傳送中有介紹過,資料包傳送分為有佇列傳送和無佇列傳送,並且有佇列傳送依賴於流量控制機制,這篇筆記就來看下流量控制相關的內容。
流量控制機制的核心是為傳送過程建立排隊規則,讓傳送的每一包資料先入佇列,然後再出佇列,真正最終傳送的順序就是出佇列的順序,而是用什麼樣的佇列,以及佇列內部如何控制可以有上層通過tc工具進行配置,對於核心的傳送框架,只規定了標準的入隊和出隊介面進行了標準化,這點在裝置介面層之資料包傳送已經詳細介紹過了。
注:這裡不談流量控制機制相關的設計原理,這個概念太大了,這裡講不清楚,相關理論知識參考資料書,這裡只關注程式碼實現相關的內容。
1. 資料結構
在struct net_device中,與排隊規則相關的幾個欄位如下:
struct net_device
{
...
/*
* Cache line mostly used on queue transmit path (qdisc)
*/
/* device queue lock */
//流量控制排隊規則的保護鎖,入隊出隊操作需要序列化
spinlock_t queue_lock ____cacheline_aligned_in_smp;
//當前該裝置生效的根排隊規則,當排隊規則啟用時,將qdisc_sleeping的值設定到qdisc
struct Qdisc *qdisc;
//當前該裝置被配置的根排隊規則
struct Qdisc *qdisc_sleeping;
//裝置可以關聯多個排隊規則,這些規則用連結串列組織,qdisc_list為表頭
struct list_head qdisc_list;
//每個傳送佇列所允許的最大包數,需要由各個網路裝置驅動程式自行初始化該成員
unsigned long tx_queue_len; /* Max frames per queue allowed */
...
}
1.2 排隊規則Qdisc
並非每個排隊規則都需要實現該結構中的所有欄位,實際上絕大多數字段只有那些高階的排隊規則才需要。參考下面的noop和pfifo_fast就清楚了。
struct Qdisc
{
//入隊和出隊介面
int (* enqueue)(struct sk_buff *skb, struct Qdisc *dev);
struct sk_buff * (*dequeue)(struct Qdisc *dev);
unsigned flags;
#define TCQ_F_BUILTIN 1
#define TCQ_F_THROTTLED 2
#define TCQ_F_INGRESS 4
//該結構需要32位元組對齊,而分配的記憶體可能不是32位元組對齊的,所以可能會由padding,該欄位記錄padding大小
int padded;
//排隊規則操作介面,流量控制框架會呼叫這裡的介面操作根排隊規則,所以排隊規則必須設定該指標
struct Qdisc_ops *ops;
//控制代碼,該控制代碼在同一網路裝置內部可以唯一標識排隊規則
u32 handle;
//父結點控制代碼
u32 parent;
//引用計數
atomic_t refcnt;
//用於管理當前佇列中的所有資料包的表頭結點
struct sk_buff_head q;
//該佇列所屬網路裝置
struct net_device *dev;
//一個網路裝置可能會有多個排隊規則,該成員用於將該排隊規則鏈如網路裝置的排隊規則連結串列中
struct list_head list;
struct gnet_stats_basic bstats;
struct gnet_stats_queue qstats;
struct gnet_stats_rate_est rate_est;
spinlock_t *stats_lock;
struct rcu_head q_rcu;
int (*reshape_fail)(struct sk_buff *skb,
struct Qdisc *q);
/* This field is deprecated, but it is still used by CBQ
* and it will live until better solution will be invented.
*/
struct Qdisc *__parent;
};
1.3 排隊規則操作函式
每一個排隊規則都必須定義一個Qdisc_ops,並將其繫結到Qdisc中,該結構包含了一組回撥函式,實際使用情形這樣的:
- 各個排隊規則模組向核心註冊自己的Qdisc_ops結構
- 當用戶空間通過tc指令為網路裝置設定排隊規則時,核心從已註冊的排隊規則中查詢
- 一旦找到,就將建立一個Qdisc結構,並將找到的Qdisc_ops中函式指標賦值到排隊規則的對應函式指標中(入隊和出隊操作)
struct Qdisc_ops
{
//系統會將所有註冊的Qdisc_ops組織成連結串列
struct Qdisc_ops *next;
//對應的類操作函式集
const struct Qdisc_class_ops *cl_ops;
//名字
char id[IFNAMSIZ];
int priv_size;
//入隊和出隊操作
int (*enqueue)(struct sk_buff *, struct Qdisc *);
struct sk_buff * (*dequeue)(struct Qdisc *);
//出隊後,如果資料包傳送失敗,會將該資料包重新入隊
int (*requeue)(struct sk_buff *, struct Qdisc *);
unsigned int (*drop)(struct Qdisc *);
//安裝排隊規則時回撥
int (*init)(struct Qdisc *, struct nlattr *arg);
//當排隊規則排移除時回撥
void (*reset)(struct Qdisc *);
void (*destroy)(struct Qdisc *);
int (*change)(struct Qdisc *, struct nlattr *arg);
int (*dump)(struct Qdisc *, struct sk_buff *);
int (*dump_stats)(struct Qdisc *, struct gnet_dump *);
struct module *owner;
};
2. 排隊規則的建立
這裡來看看在裝置狀態變化過程中,排隊規則相關的程式碼流程。
2.1 設備註冊時
在設備註冊register_netdevice()中,有呼叫dev_init_scheduler()對net_device中的排隊規則相關欄位進行初始化。
void dev_init_scheduler(struct net_device *dev)
{
qdisc_lock_tree(dev);
dev->qdisc = &noop_qdisc;
dev->qdisc_sleeping = &noop_qdisc;
INIT_LIST_HEAD(&dev->qdisc_list);
qdisc_unlock_tree(dev);
//該timer用於檢測傳送是否有異常,實際上就是如果一段時間沒有資料傳送就回調驅動提供的tx_timeout()
setup_timer(&dev->watchdog_timer, dev_watchdog, (unsigned long)dev);
}
初始化時,設定的根排隊規則是noop_qdisc,從下面的程式碼中可以看到noop實際上是一種垃圾回收佇列,其入隊操作會釋放傳入的資料包,並且返回裝置忙錯誤碼;其出對列操作返回的資料包永遠為空。
/* "NOOP" scheduler: the best scheduler, recommended for all interfaces
under all circumstances. It is difficult to invent anything faster or
cheaper.
*/
static int noop_enqueue(struct sk_buff *skb, struct Qdisc * qdisc)
{
//直接Free,並且返回繁忙標記
kfree_skb(skb);
return NET_XMIT_CN;
}
static struct sk_buff *noop_dequeue(struct Qdisc * qdisc)
{
//因為不存在入隊,所以這裡返回NULL也正常
return NULL;
}
static int noop_requeue(struct sk_buff *skb, struct Qdisc* qdisc)
{
if (net_ratelimit())
printk(KERN_DEBUG "%s deferred output. It is buggy.\n",
skb->dev->name);
kfree_skb(skb);
return NET_XMIT_CN;
}
struct Qdisc_ops noop_qdisc_ops __read_mostly = {
.id = "noop",
.priv_size = 0,
.enqueue = noop_enqueue,
.dequeue = noop_dequeue,
.requeue = noop_requeue,
.owner = THIS_MODULE,
};
struct Qdisc noop_qdisc = {
.enqueue = noop_enqueue,
.dequeue = noop_dequeue,
.flags = TCQ_F_BUILTIN,
//ops和list根本就不會被用到
.ops = &noop_qdisc_ops,
.list = LIST_HEAD_INIT(noop_qdisc.list),
};
從上面可以看到,註冊時指定的noop_disc根本就無法用來發送資料,所以在裝置被開啟時,還有相關的操作。
2.2 裝置開啟時
裝置開啟時,會呼叫dev_open(),在該函式中,會呼叫dev_activate()啟用排隊規則,如果當前配置的排隊的隊則是noop_qdisc,那麼會建立一個預設的pfifo_fast排隊規則。
void dev_activate(struct net_device *dev)
{
/* No queueing discipline is attached to device;
create default one i.e. pfifo_fast for devices,
which need queueing and noqueue_qdisc for
virtual interfaces
*/
//如果當前沒有配置過其它型別的排隊規則,則新建一個預設的
if (dev->qdisc_sleeping == &noop_qdisc) {
struct Qdisc *qdisc;
//最大佇列長度需要大於0,如果為0,那麼就無法傳送了,所以用noop_qdisc就好,沒有必要重新分配
if (dev->tx_queue_len) {
//建立的預設排隊規則為pfifo_fast,這是一種略微加強版的FIFO
qdisc = qdisc_create_dflt(dev, &pfifo_fast_ops,
TC_H_ROOT);
if (qdisc == NULL) {
printk(KERN_INFO "%s: activation failed\n", dev->name);
return;
}
//加入裝置的排隊規則連結串列中
list_add_tail(&qdisc->list, &dev->qdisc_list);
} else {
//如果驅動設定的最大佇列長度為0,那麼會指定一種無佇列排隊規則
qdisc = &noqueue_qdisc;
}
//新建的排隊規則賦值給qdisc_sleeping,該欄位代表了當前配置的根排隊規則
dev->qdisc_sleeping = qdisc;
}
//注意,因為上面並沒有操作dev->qdisc,所以並不需要持queue_lock
//如果當前網路鏈路不ok,那麼直接返回
if (!netif_carrier_ok(dev))
/* Delay activation until next carrier-on event */
return;
//網路鏈路ok,可以開啟排隊規則,所以將qdisc_sleeping的值賦給qdisc
spin_lock_bh(&dev->queue_lock);
rcu_assign_pointer(dev->qdisc, dev->qdisc_sleeping);
if (dev->qdisc != &noqueue_qdisc) {
//這裡的記錄jiffies好奇怪,這裡又沒有傳送資料包???
dev->trans_start = jiffies;
dev_watchdog_up(dev);
}
spin_unlock_bh(&dev->queue_lock);
}
2.2.1 建立預設排隊規則pfifo_fast
上面呼叫建立預設的排隊規則。
struct Qdisc * qdisc_create_dflt(struct net_device *dev, struct Qdisc_ops *ops,
unsigned int parentid)
{
struct Qdisc *sch;
//分配Qdisc結構,並進行一定的初始化
sch = qdisc_alloc(dev, ops);
if (IS_ERR(sch))
goto errout;
sch->stats_lock = &dev->queue_lock;
sch->parent = parentid;
//如果排隊規則自己也定義了init(),則回撥
if (!ops->init || ops->init(sch, NULL) == 0)
return sch;
//初始化失敗,釋放Qdisc後返回失敗
qdisc_destroy(sch);
errout:
return NULL;
}
EXPORT_SYMBOL(qdisc_create_dflt);
struct Qdisc *qdisc_alloc(struct net_device *dev, struct Qdisc_ops *ops)
{
void *p;
struct Qdisc *sch;
unsigned int size;
int err = -ENOBUFS;
//分配的記憶體大小要32位元組對齊
/* ensure that the Qdisc and the private data are 32-byte aligned */
size = QDISC_ALIGN(sizeof(*sch));
size += ops->priv_size + (QDISC_ALIGNTO - 1);
p = kzalloc(size, GFP_KERNEL);
if (!p)
goto errout;
//padded記錄了可能的填充
sch = (struct Qdisc *) QDISC_ALIGN((unsigned long) p);
sch->padded = (char *) sch - (char *) p;
//初始化Qdisc成員
INIT_LIST_HEAD(&sch->list);
skb_queue_head_init(&sch->q);
sch->ops = ops;
//Qdisc中的入隊和出隊就是排隊規則定義的入隊和出對
sch->enqueue = ops->enqueue;
sch->dequeue = ops->dequeue;
sch->dev = dev;
//排隊規則會持有一個裝置的引用計數
dev_hold(dev);
atomic_set(&sch->refcnt, 1);
return sch;
errout:
return ERR_PTR(-err);
}
2.3 裝置關閉時
裝置關閉時,會呼叫dev_close(),該函式中會呼叫dev_deactivate()關閉流量控制排隊規則.
void dev_deactivate(struct net_device *dev)
{
struct Qdisc *qdisc;
struct sk_buff *skb;
int running;
spin_lock_bh(&dev->queue_lock);
qdisc = dev->qdisc;
//將當前生效的排隊規則設定為noop_qdisc,該排隊規則無法傳送資料
dev->qdisc = &noop_qdisc;
//呼叫Qdis_ops->reset()介面,復位之前設定的排隊規則
qdisc_reset(qdisc);
skb = dev->gso_skb;
dev->gso_skb = NULL;
spin_unlock_bh(&dev->queue_lock);
kfree_skb(skb);
//停止網路裝置watch dog
dev_watchdog_down(dev);
/* Wait for outstanding qdisc-less dev_queue_xmit calls. */
synchronize_rcu();
/* Wait for outstanding qdisc_run calls. */
//等待所有傳送過程結束
do {
while (test_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
yield();
/*
* Double-check inside queue lock to ensure that all effects
* of the queue run are visible when we return.
*/
spin_lock_bh(&dev->queue_lock);
running = test_bit(__LINK_STATE_QDISC_RUNNING, &dev->state);
spin_unlock_bh(&dev->queue_lock);
/*
* The running flag should never be set at this point because
* we've already set dev->qdisc to noop_qdisc *inside* the same
* pair of spin locks. That is, if any qdisc_run starts after
* our initial test it should see the noop_qdisc and then
* clear the RUNNING bit before dropping the queue lock. So
* if it is set here then we've found a bug.
*/
} while (WARN_ON_ONCE(running));
}
/* Under dev->queue_lock and BH! */
void qdisc_reset(struct Qdisc *qdisc)
{
const struct Qdisc_ops *ops = qdisc->ops;
//呼叫排隊規則的reset()介面
if (ops->reset)
ops->reset(qdisc);
}
EXPORT_SYMBOL(qdisc_reset);