1. 程式人生 > >Linux socket 資料傳送類函式實現(三)

Linux socket 資料傳送類函式實現(三)

注:本文分析基於3.10.0-693.el7核心版本,即CentOS 7.4

上回我們說到send四兄弟在sock_sendmsg()完成集合,write家族也在__sock_sendmsg()處等待。

其實sock_sendmsg()的下一步就是進入__sock_sendmsg(),所以我們就從__sock_sendmsg()開始,帶著這些兵馬再次出發吧。

int sock_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
{
    struct kiocb iocb;
    struct sock_iocb siocb;
    int
ret; //構造IO請求 init_sync_kiocb(&iocb, NULL); iocb.private = &siocb; //對於write家族,並不經過sock_sendmsg,直接橫刀立馬,在此等候 ret = __sock_sendmsg(&iocb, sock, msg, size); if (-EIOCBQUEUED == ret) ret = wait_on_sync_kiocb(&iocb); return ret; }

進入大一統的__sock_sendmsg

static
inline int __sock_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t size) { //安全檢查 int err = security_socket_sendmsg(sock, msg, size); return err ?: __sock_sendmsg_nosec(iocb, sock, msg, size); }

還記得sendmmsg裡,在傳送多個訊息是,為了提高效能會呼叫sock_sendmsg_nosec()

,這是為了跳過上面的安全檢查,提高效能。簡單檢查後就進入__sock_sendmsg_nosec()

static inline int __sock_sendmsg_nosec(struct kiocb *iocb, struct socket *sock,
                       struct msghdr *msg, size_t size)
{
    struct sock_iocb *si = kiocb_to_siocb(iocb);
    //對sock_iocb結構進行賦值
    si->sock = sock;
    si->scm = NULL;
    si->msg = msg;//訊息地址
    si->size = size;//訊息長度
    //呼叫tcp層傳送函式
    return sock->ops->sendmsg(iocb, sock, msg, size);
}

在TCP協議中,sock->ops指向的是inet_stream_ops,在socket系統呼叫時賦值,

const struct proto_ops inet_stream_ops = {
    ...
    .sendmsg       = inet_sendmsg,
    .recvmsg       = inet_recvmsg,
    ...
};

由此可知,呼叫的是inet_sendmsg()函式,

int inet_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
         size_t size)
{
    struct sock *sk = sock->sk;

    sock_rps_record_flow(sk);

    /* We may need to bind the socket. */
    //如果沒有繫結埠,那就先繫結
    if (!inet_sk(sk)->inet_num && !sk->sk_prot->no_autobind &&
        inet_autobind(sk))
        return -EAGAIN;
    //又是一層指標
    return sk->sk_prot->sendmsg(iocb, sk, msg, size);
}

簡單處理後,又進了sk->sk_prot的sendmsg,對於TCP而言,sk->sk_prot指向tcp_prot

struct proto tcp_prot = {
    .name           = "TCP",
    ...
    .recvmsg        = tcp_recvmsg,
    .sendmsg        = tcp_sendmsg,
    ...
};

這兩個結構體用到的還是很頻繁的,可以多點關注。

所以層層深入,走向tcp_sendmsg()函式,也是夠曲折的,

int tcp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        size_t size)
{
    struct iovec *iov;
    struct tcp_sock *tp = tcp_sk(sk);
    struct sk_buff *skb;
    int iovlen, flags, err, copied = 0;
    int mss_now = 0, size_goal, copied_syn = 0, offset = 0;
    bool sg;
    long timeo;

    lock_sock(sk);

    flags = msg->msg_flags;
    if (flags & MSG_FASTOPEN) {//快速開啟選項,暫不分析
        err = tcp_sendmsg_fastopen(sk, msg, &copied_syn, size);
        if (err == -EINPROGRESS && copied_syn > 0)
            goto out;
        else if (err)
            goto out_err;
        offset = copied_syn;
    }
    //如果是阻塞呼叫,獲取傳送超時時間,超時時間可通過SO_SNDTIMEO選項設定
    timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

    //等待連線建立,使用快速開啟選項的被動連線端可不用等待連線完全建立即可傳送資料
    if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
        !tcp_passive_fastopen(sk)) {
        if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)
            goto do_error;
    }
    ...
    /* This should be in poll */
    clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
    //size_goal表示GSO支援的大小,為mss的整數倍,不支援GSO時則和mss相等
    mss_now = tcp_send_mss(sk, &size_goal, flags);

    /* Ok commence sending. */
    iovlen = msg->msg_iovlen;
    iov = msg->msg_iov;
    copied = 0;

    err = -EPIPE;
    if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
        goto out_err;
    //確定網絡卡是否支援分散聚合
    sg = !!(sk->sk_route_caps & NETIF_F_SG);
    //遍歷每一個數據塊,傳送資料
    while (--iovlen >= 0) {
        size_t seglen = iov->iov_len;//資料長度
        unsigned char __user *from = iov->iov_base;//資料起始地址

        iov++;
        if (unlikely(offset > 0)) {  /* Skip bytes copied in SYN */
            //如果是快速開啟的連線,減去在SYN報文中傳送的資料
            if (offset >= seglen) {
                offset -= seglen;
                continue;
            }
            seglen -= offset;
            from += offset;
            offset = 0;
        }

        while (seglen > 0) {
            int copy = 0;
            int max = size_goal;
            //獲取傳送佇列的最後一個skb
            skb = tcp_write_queue_tail(sk);
            if (tcp_send_head(sk)) {//還有未傳送資料,即skb還未傳送
                if (skb->ip_summed == CHECKSUM_NONE)
                    max = mss_now;
                copy = max - skb->len;//能追加資料的長度
            }

            if (copy <= 0) {//無法追加資料,需要重新分配一個skb來存放資料
new_segment:
                //如果傳送佇列的總大小sk_wmem_queued大於等於傳送快取的上限sk_sndbuf
                //等待發送快取釋放
                if (!sk_stream_memory_free(sk))
                    goto wait_for_sndbuf;
                //重新分配skb結構
                skb = sk_stream_alloc_skb(sk, select_size(sk, sg), sk->sk_allocation);
                if (!skb)
                    goto wait_for_memory;

                if (sk->sk_route_caps & NETIF_F_CSUM_MASK)
                    skb->ip_summed = CHECKSUM_PARTIAL;
                //把新分配的skb加入傳送佇列的尾部,
                //同時增加發送佇列的大小,減小預分配快取的大小
                skb_entail(sk, skb);
                copy = size_goal;
                max = size_goal;

                if (tp->repair)
                    TCP_SKB_CB(skb)->sacked |= TCPCB_REPAIRED;
            }

            //本次可追加資料的資料足以滿足此次資料傳送,因此拷貝數設定為資料長度
            if (copy > seglen)
                copy = seglen;

            if (skb_availroom(skb) > 0) {
                /* skb線性區還有空間,那就先使用這部分空間 */
                copy = min_t(int, copy, skb_availroom(skb));
                //拷貝使用者資料到核心空間,同時計算校驗和
                err = skb_add_data_nocache(sk, skb, from, copy);
                if (err)
                    goto do_fault;
            } else {
                //線性區無空間,使用分頁區
                bool merge = true;
                int i = skb_shinfo(skb)->nr_frags;//獲取skb分頁數
                struct page_frag *pfrag = sk_page_frag(sk);
                //檢查分頁是否有空間,沒有則申請新的分頁
                if (!sk_page_frag_refill(sk, pfrag))
                    goto wait_for_memory;
                //判斷是否能往最後一個分頁追加資料,即資料是否能合併
                if (!skb_can_coalesce(skb, i, pfrag->page, pfrag->offset)) {
                    //如果分頁數量已達上限,或者網絡卡不支援分散聚合
                    //將資料包設定PSH標誌,以便儘快傳送,然後重新申請新skb存放資料
                    if (i == MAX_SKB_FRAGS || !sg) {
                        tcp_mark_push(tp, skb);
                        goto new_segment;
                    }
                    merge = false;//無法合併
                }
                //可用於追加資料的空間
                copy = min_t(int, copy, pfrag->size - pfrag->offset);

                if (!sk_wmem_schedule(sk, copy))
                    goto wait_for_memory;
                //拷貝使用者資料到核心skb的分頁空間,同時計算校驗和。
                //更新skb的長度欄位,更新sock的傳送佇列大小和預分配快取
                err = skb_copy_to_page_nocache(sk, from, skb, pfrag->page,
                                   pfrag->offset, copy);
                if (err)
                    goto do_error;

                /* Update the skb. */
                if (merge) {
                    //如果可合併資料,更新資料長度統計
                    skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
                } else {
                    //將新申請的分頁掛到skb上,並更新分頁數統計
                    skb_fill_page_desc(skb, i, pfrag->page, pfrag->offset, copy);
                    get_page(pfrag->page);
                }
                pfrag->offset += copy;//更新分頁偏移位置
            }
            //如果第一次拷貝資料,不設定PSH標記
            if (!copied)
                TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;

            tp->write_seq += copy;//更新發送佇列序號
            TCP_SKB_CB(skb)->end_seq += copy;//更新skb資料結束序號
            skb_shinfo(skb)->gso_segs = 0;//清零gso_segs,在tcp_write_xmit中重新計算

            from += copy;//更新使用者資料起始位置偏移
            copied += copy;//已拷貝資料統計
            if ((seglen -= copy) == 0 && iovlen == 0)//資料拷貝完畢,跳出迴圈
                goto out;

            if (skb->len < max || (flags & MSG_OOB) || unlikely(tp->repair))
                continue;
            //如果需要設定PSH標記
            if (forced_push(tp)) {
                //設定PSH標記
                tcp_mark_push(tp, skb);
                //將傳送佇列的資料報文儘可能的傳送出去
                __tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
            } else if (skb == tcp_send_head(sk))
                //否則只發送當前這一個skb
                tcp_push_one(sk, mss_now);
            continue;

wait_for_sndbuf:
            set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
            //如果已經有資料拷貝至傳送佇列,那就先把這部分發送,再等待記憶體釋放
            if (copied)
                tcp_push(sk, flags & ~MSG_MORE, mss_now,
                     TCP_NAGLE_PUSH, size_goal);

            if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
                goto do_error;

            mss_now = tcp_send_mss(sk, &size_goal, flags);
        }
    }

out:
    //如果已經有資料拷貝至傳送佇列,那就傳送資料
    if (copied)
        tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
out_nopush:
    release_sock(sk);
    return copied + copied_syn;

do_fault:
    if (!skb->len) {
        tcp_unlink_write_queue(skb, sk);
        /* It is the one place in all of TCP, except connection
         * reset, where we can be unlinking the send_head.
         */
        tcp_check_send_head(sk, skb);
        sk_wmem_free_skb(sk, skb);
    }

do_error:
    if (copied + copied_syn)
        goto out;
out_err:
    err = sk_stream_error(sk, flags, err);
    release_sock(sk);
    return err;
}

可見,tcp_sendmsg()只要是在處理使用者資料的存放,優先考慮報文的線性區,然後是分頁區,必要時需要使用新skb或者新分頁來存放使用者資料。

拷貝好使用者資料後,接下來就是傳送資料,主要涉及__tcp_push_pending_framestcp_push_onetcp_push這三個傳送函式。實際上tcp_push_one是tcp_push的一種特殊形式,且tcp_push簡單封裝後也會呼叫__tcp_push_pending_frames,因此我們主要介紹tcp_push這個傳送函式。

static void tcp_push(struct sock *sk, int flags, int mss_now,
             int nonagle, int size_goal)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct sk_buff *skb;
    //是否有資料需要傳送
    if (!tcp_send_head(sk))
        return;
    //獲取傳送佇列中的最後一個skb
    skb = tcp_write_queue_tail(sk);
    //判斷是否需要設定PSH標記
    if (!(flags & MSG_MORE) || forced_push(tp))
        tcp_mark_push(tp, skb);
    //如果設定了MSG_OOB選項,就記錄緊急指標
    tcp_mark_urg(tp, flags);
    //判斷是否需要阻塞小包,將多個小資料段合併到一個skb中一起傳送
    if (tcp_should_autocork(sk, skb, size_goal)) {

        /* avoid atomic op if TSQ_THROTTLED bit is already set */
        //設定TSQ_THROTTLED標誌
        if (!test_bit(TSQ_THROTTLED, &tp->tsq_flags)) {
            NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPAUTOCORKING);
            set_bit(TSQ_THROTTLED, &tp->tsq_flags);
        }
        /* It is possible TX completion already happened
         * before we set TSQ_THROTTLED.
         */
        //有可能在設定TSQ_THROTTLED前,網絡卡tx ring已經完成傳送
        //因此再次檢查條件,避免錯誤阻塞資料報文
        if (atomic_read(&sk->sk_wmem_alloc) > skb->truesize)
            return;//返回,阻塞小包傳送
    }
    //應用程式用MSG_MORE標識告訴4層將會有更多的小資料包的傳輸
    //然後將這個標記再傳遞給3層,3層就會提前劃分一個mtu大小的資料包,來組合這些資料幀
    if (flags & MSG_MORE)
        //標記cork,阻塞
        nonagle = TCP_NAGLE_CORK;
    //TCP層還沒處理完,接著往下走
    __tcp_push_pending_frames(sk, mss_now, nonagle);
}

tcp_push()中涉及到小包阻塞的問題,使用了TSQ機制,即TCP Small Queue,通過tcp_should_autocork()判斷是否開啟。

static bool tcp_should_autocork(struct sock *sk, struct sk_buff *skb, int size_goal)
{
    return skb->len < size_goal && //資料報文長度小於最大報文值,也就是小包
           sysctl_tcp_autocorking && //預設開啟,由/proc/sys/net/ipv4/tcp_autocorking控制
           skb != tcp_write_queue_head(sk) && //該報文不是第一個將要傳送的報文,即還有其他資料等待發送
           //網絡卡佇列中還有資料,sk_wmem_alloc的值在報文傳送後會減小
           atomic_read(&sk->sk_wmem_alloc) > skb->truesize; 
}

可以看出,其基本思想就是利用資料報文傳送的這段時間,將小包儘量組合成大包傳送,既減小發送頻寬,同時也不會降低傳輸速率(網絡卡和傳送佇列有報文傳送時才阻塞,無報文等待發送時,就直接傳送,不阻塞)。

如果沒有阻塞,則呼叫__tcp_push_pending_frames()繼續往下遞交資料傳送,

void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss, int nonagle)
{
    //如果連線處於關閉狀態,返回不處理
    if (unlikely(sk->sk_state == TCP_CLOSE))
        return;
    //tcp_write_xmit會再次判斷是否要推遲資料傳送,使用nagle演算法
    if (tcp_write_xmit(sk, cur_mss, nonagle, 0, sk_gfp_atomic(sk, GFP_ATOMIC)))
        //傳送失敗,檢測是否需要開啟零視窗探測定時器
        tcp_check_probe_timer(sk);
}

上面有講到使用TSQ機制判斷是否要阻塞小包,進入tcp_write_xmit()後還會使用nagle演算法判斷是否 要阻塞傳送。

static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
               int push_one, gfp_t gfp)
{
    struct tcp_sock *tp = tcp_sk(sk);
    struct sk_buff *skb;
    unsigned int tso_segs, sent_pkts;
    int cwnd_quota;
    int result;
    bool is_cwnd_limited = false;
    u32 max_segs;

    sent_pkts = 0;
    //如果只發送一個數據報文,則不做MTU探測
    if (!push_one) {
        //MTU探測
        result = tcp_mtu_probe(sk);
        if (!result) {
            return false;
        } else if (result > 0) {
            sent_pkts = 1;
        }
    }
    //計算TSO支援的最大segs數量
    max_segs = tcp_tso_autosize(sk, mss_now);
    //遍歷傳送佇列裡的報文,將其傳送出去
    while ((skb = tcp_send_head(sk))) {
        unsigned int limit;
        //重新計算gso_segs,因為在tcp_sendmsg中被清零了
        //在支援GSO的情況下值為skb->len/mss_now
        tso_segs = tcp_init_tso_segs(sk, skb, mss_now);
        BUG_ON(!tso_segs);

        if (unlikely(tp->repair) && tp->repair_queue == TCP_SEND_QUEUE) {
            /* "skb_mstamp" is used as a start point for the retransmit timer */
            skb_mstamp_get(&skb->skb_mstamp);
            goto repair; /* Skip network transmission */
        }
        //根據視窗大小計算此時能發多少個segment
        cwnd_quota = tcp_cwnd_test(tp, skb);
        if (!cwnd_quota) {
            is_cwnd_limited = true;
            if (push_one == 2)
                /* Force out a loss probe pkt. */
                cwnd_quota = 1;
            else
                break;
        }
        //檢測當前視窗是否能傳送skb的第一個分段
        if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now)))
            break;

        if (tso_segs == 1) { //tso_segs = 1表示無需TSO分段
            //根據nagle演算法判斷是否需要推遲傳送報文
            //如果是傳送佇列裡最後一個報文,無需推遲
            if (unlikely(!tcp_nagle_test(tp, skb, mss_now, (tcp_skb_is_last(sk, skb) ?
                              nonagle : TCP_NAGLE_PUSH))))
                break;
        } else { //有TSO分段
            //有多個skb需要傳送,根據TSO規則判斷是否需要推遲傳送
            if (!push_one && tcp_tso_should_defer(sk, skb, &is_cwnd_limited, max_segs))
                break;
        }
        //走到這說明是要立即傳送的流程
        limit = mss_now;
        if (tso_segs > 1 && !tcp_urg_mode(tp))
            //根據視窗大小以及mss計算此時可傳送報文的大小限制
            limit = tcp_mss_split_point(sk, skb, mss_now,
                            min_t(unsigned int, cwnd_quota, max_segs), nonagle);
        //如果傳送的資料報文長度大於此時可傳送報文限制,那就要進行分段了
        //按照limit的大小進行分割skb
        if (skb->len > limit && unlikely(tso_fragment(sk, skb, limit, mss_now, gfp)))
            break;

        /* TCP Small Queues :
         * Control number of packets in qdisc/devices to two packets / or ~1 ms.
         * This allows for :
         *  - better RTT estimation and ACK scheduling
         *  - faster recovery
         *  - high rates
         * Alas, some drivers / subsystems require a fair amount
         * of queued bytes to ensure line rate.
         * One example is wifi aggregation (802.11 AMPDU)
         */
        //兩倍當前包大小或者每毫秒速率
        limit = max(2 * skb->truesize, sk->sk_pacing_rate >> 10);
        //不能大於sysctl_tcp_limit_output_bytes
        //由/proc/sys/net/ipv4/tcp_limit_output_bytes設定,預設為256K
        limit = min_t(u32, limit, sysctl_tcp_limit_output_bytes);
        //未傳送資料報文過多,先不往傳送佇列裡放資料了,設定TSQ_THROTTLED標記
        //往TSQ佇列裡放資料
        if (atomic_read(&sk->sk_wmem_alloc) > limit) {
            set_bit(TSQ_THROTTLED, &tp->tsq_flags);
            /* It is possible TX completion already happened
             * before we set TSQ_THROTTLED, so we must
             * test again the condition.
             * We abuse smp_mb__after_clear_bit() because
             * there is no smp_mb__after_set_bit() yet
             */
            smp_mb__after_clear_bit();
            //設定標記後再檢查一次,畢竟不是同步操作
            if (atomic_read(&sk->sk_wmem_alloc) > limit)
                break;
        }
        //傳送資料報文,往後會填充TCP頭部,然後交由IP層繼續封裝
        //這裡傳送的是資料的副本,由第三個引數控制,1即傳送副本
        //資料只有在收到ACK確認後才會從傳送佇列裡刪除
        if (unlikely(tcp_transmit_skb(sk, skb, 1, gfp)))
            break;

repair:
        /* Advance the send_head.  This one is sent out.
         * This call will increment packets_out.
         */
        tcp_event_new_data_sent(sk, skb);

        tcp_minshall_update(tp, mss_now, skb);
        sent_pkts += tcp_skb_pcount(skb);//統計傳送的報文數量

        if (push_one)
            break;
    }

    if (likely(sent_pkts)) {
        if (tcp_in_cwnd_reduction(sk))
            tp->prr_out += sent_pkts;

        /* Send one loss probe per tail loss episode. */
        if (push_one != 2)
            tcp_schedule_loss_probe(sk);
        tcp_cwnd_validate(sk, is_cwnd_limited);
        return false;
    }
    return (push_one == 2) || (!tp->packets_out && tcp_send_head(sk));
}

可見在一切順利的情況下,最後將發往tcp_transmit_skb()去填充TCP頭部,然後交往IP層繼續封裝。這裡我們就不再往下跟蹤了,有時間再看看IP層的處理。