1. 程式人生 > 實用技巧 >【計算機網路】Stanford CS144 Lab Assignments 學習筆記

【計算機網路】Stanford CS144 Lab Assignments 學習筆記

本文為我的斯坦福計算機網路課的程式設計實驗(Lab Assignments)的學習總結。課程全稱:CS 144: Introduction to Computer Networking。

事情發生於我讀了半本《計算機網路:自頂向下方法》後,想要找點練手的東西,碰巧在知乎上看到了這個推薦帖:CS144: 什麼,你學不會TCP?那就來自己寫一個吧!。這門課的作業要求實現一個簡單的TCP協議,自帶充足評測程式,同時又比較有挑戰性,我便欣然做之。

LAB0

在我開始做實驗的時候官方不知為何已經刪掉了sponge的github程式碼倉庫,不過慶幸的是有huangrt01老哥早早做完了實驗把自己的程式碼發到了github上,我克隆到本地後利用git回退到最初狀態就能做了。不過因為CS144官方不喜歡大夥兒把完成的程式碼公開到網上,防止後來的學生抄作業,所以本文釋出時huangrt01已經把倉庫給設為private了。咱這裡給出自己做完後打包好的倉庫,寄存在gitee上(老外應該不會上gitee吧)。

另一件尷尬的事兒是CS144的2020年開了新課,舊版網頁被覆蓋掉了,我在gitee上拉了倉庫映象,克隆到本地後回退到2019年的最後一次提交,然後開啟index.html就可以正常使用了。

CS144官方網頁:https://cs144.github.io/

官網備份:https://gitee.com/kangyupl/cs144.github.io

實驗備份:https://gitee.com/kangyupl/sponge

開工準備

首先要安裝g++-8clang-6,請根據自己的linux發行版自行搜尋對應方法。我用的是g++-8,如果安裝後CMAKE的時候還是提示g++版本不夠,那就百度一下怎麼把用g++-8

替代gcc,再不行的話那就為gcc-8建立一個名為cc的軟連線,為g++-8建立一個名為c++的軟連結。

另外,如果你CMAKE的時候報出瞭如下錯誤:

CMake Error: The following variables are used in this project, but they are set to NOTFOUND.
Please set them or make sure they are set and tested correctly in the CMake files:
LIBPCAP
    linked by target "udp_tcpdump" in directory /home/kangyu/sponge/apps
    linked by target "ipv4_parser" in directory /home/kangyu/sponge/tests
    linked by target "ipv4_parser" in directory /home/kangyu/sponge/tests
    linked by target "tcp_parser" in directory /home/kangyu/sponge/tests
    linked by target "tcp_parser" in directory /home/kangyu/sponge/tests

此時安裝libpcap-dev庫來解決,大多數的Linux發行版的軟體源中應該都有這玩意。

Writing webget

要求實現get_URL函式,功能為向指定IP地址傳送HTTP GET請求,然後輸出所有響應。可參考配套Doc中[TCPSocket的示例程式碼](https://cs144.github.io/doc/lab0/class_t_c_p_socket.html。此外多讀讀講義提示,注意下EOFshutdown()的引數即可。

webget.cc

void get_URL(const string &host, const string &path) {

    TCPSocket sock{};
    sock.connect(Address(host,"http"));
    sock.write("GET "+path+" HTTP/1.1\r\nHost: "+host+"\r\n\r\n");
    sock.shutdown(SHUT_WR);
    while(!sock.eof()){
        cout<<sock.read();
    }
    sock.close();
    return;
}

An in-memory reliable byte stream

要求實現一個有序位元組流類(in-order byte stream),使之支援讀寫、容量控制。這個位元組流類似於一個帶容量的佇列,從一頭讀,從另一頭寫。當流中的資料達到容量上限時,便無法再寫入新的資料。特別的,寫操作被分為了peek和pop兩步。peek為從頭部開始讀取指定數量的位元組,pop為彈出指定數量的位元組。

第一反應是搞個迴圈佇列,容器基於陣列,長度等於容量,這樣記憶體被充分利用,效率也不錯。不過講義要求我們用“Modern C++”,避免用普通指標,所以我退而求其次用std::deque代替。為什麼不用std::queue?因為queue只能訪問開頭的節點,無法實現peek操作。

byte_stream.hh

class ByteStream {
  private:
    // Your code here -- add private members as necessary.
    std::deque<char> _buffer = {};
    size_t _capacity = 0;
    size_t _read_count = 0;
    size_t _write_count = 0;
    bool _input_ended_flag = false;
    bool _error = false;  //!< Flag indicating that the stream suffered an error.
    //......

byte_stream.cc

ByteStream::ByteStream(const size_t capacity) : _capacity(capacity) {}

size_t ByteStream::write(const string &data) {
    size_t len = data.length();
    if (len > _capacity - _buffer.size()) {
        len = _capacity - _buffer.size();
    }
    _write_count += len;
    for (size_t i = 0; i < len; i++) {
        _buffer.push_back(data[i]);
    }
    return len;
}

//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
    size_t length = len;
    if (length > _buffer.size()) {
        length = _buffer.size();
    }
    return string().assign(_buffer.begin(), _buffer.begin() + length);
}

//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
    size_t length = len;
    if (length > _buffer.size()) {
        length = _buffer.size();
    }
    _read_count += length;
    while (length--) {
        _buffer.pop_front();
    }
    return;
}

void ByteStream::end_input() { _input_ended_flag = true; }

bool ByteStream::input_ended() const { return _input_ended_flag; }

size_t ByteStream::buffer_size() const { return _buffer.size(); }

bool ByteStream::buffer_empty() const { return _buffer.size() == 0; }

bool ByteStream::eof() const { return buffer_empty() && input_ended(); }

size_t ByteStream::bytes_written() const { return _write_count; }

size_t ByteStream::bytes_read() const { return _read_count; }

size_t ByteStream::remaining_capacity() const { return _capacity - _buffer.size(); }

除錯方法論

講一下使用vscode時debug的方法:

如下圖,在本次check中,測試樣例t_strm_reassem_single出錯。

在啃了會兒makefile翻了翻目錄後,可以在sponge/build/tests/目錄下發現這一測試樣例對應的程式。

對應的原始檔存放在sponge/tests/

如果使用GDB的話,現在可以直接通過gdb 程式路徑進行除錯。不過我使用的是基於GDB外掛的vscode,需要對launch.json做點小修改。我把修改的行加了註釋。

{
	"version": "0.2.0",
	"configurations": [
		{
			"name": "sponge debug",//!挑個容易識別的名字
			"type": "cppdbg",
			"request": "launch",
			"program": "${workspaceFolder}/build/tests/${fileBasenameNoExtension}", //!設定為測試程式原始碼相對應的目標程式路徑
			"args": [],
			"stopAtEntry": false,
			"cwd": "${workspaceFolder}",
			"environment": [],
			"externalConsole": false,
			"MIMode": "gdb",
			"setupCommands": [
				{
					"description": "為 gdb 啟用整齊列印",
					"text": "-enable-pretty-printing",
					"ignoreFailures": true
				}
			],
			//"preLaunchTask": "C/C++: g++-8 build active file",  //!不需要前置任務
			"miDebuggerPath": "/usr/bin/gdb"
		}
	]
}

此時在vscode中切回fsm_stream_reassembler_harness.cc,打完斷點後即可正常除錯

LAB1

要求實現一個流重組器(stream reassembler),可以將帶索引的位元組流碎片重組成有序的位元組流。每個位元組流碎片都通過索引、長度、內容三要素進行描述。重組完的位元組流應當被送入指定的位元組流(byte stream)物件_output中。

特別注意:

0.這節需要安裝pcap庫和pcap-dev庫才能正常編譯,如果沒編譯沒報錯那就沒事了。

1.碎片可能交叉或重疊。

2.如果某次新碎片到達後位元組流的開頭部分被湊齊,那就應當立刻把湊齊的部分立刻寫入到_output中。即對應講義中的:

When should bytes be written to the stream?

As soon as possible. The only situation in which a byte should not be in the stream is that when there is a byte before it that has not been “pushed” yet.

3.碎片可能是一個只包含EOF標誌的空串

4.LAB0的順序位元組流和LAB1的流重組器各有各的容量限制。流重組器把位元組流寫滿後,只有當位元組流騰出空後才能繼續寫,相當於位元組流滿時流重組器出口被“堵住”了。同樣當流重組器容量滿了後自身也無法被寫入新資料,此時到來的新碎片只能被丟棄掉。

第一反應聯想到了作業系統裡的程序記憶體管理,用一個二叉排序樹來記錄每個碎片的索引、長度,排序規則為按索引值升序,每次插入新碎片時判斷能不能和前後碎片進行合併。流的內容則可以用一個數組來做緩衝區,或者乾脆一塊儲存在二叉樹的節點裡。不過還是因為“Modern C++”的緣故,我再次退而求其次用std::list代替之。等我哼哧哼哧花了好幾個小時寫完LAB1後,又哼哧哼哧得改了一眾BUG後,才想起std::set底層就是用紅黑樹實現的,可以直接拿來用。

“哼哼--哼哼哼---哼哼哼哼----啊啊啊啊啊啊啊啊”阿宅大哭。

最終實現與上文願景差不多,用一個block_node結構體來存放每個碎片的索引、長度、內容。又因為set排序實現基於對應節點型別的小於運算子規則,所以我把block_node結構體的小於運算子過載為按索引值升序。再簡單說下我的push_substring處理流程:

  • 容量判斷:滿了就立刻返回。
  • 處理子串的冗餘字首:如果子串包含已經被寫入位元組流的部分,就把這部分剪掉。
  • 合併子串:運用set自帶的lowerbound快速確定插入位置,前後重複比較,用個自己寫的子函式判斷重疊的字順便合併之。
  • 寫入位元組流:如果流重組器頭部非空,就把頭部寫入位元組流,並更新指示頭部的遊標。
  • EOF判斷

stream_reassembler.hh

class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.
    struct block_node {
        size_t begin = 0;
        size_t length = 0;
        std::string data = "";
        bool operator<(const block_node t) const { return begin < t.begin; }
    };
    std::set<block_node> _blocks = {};
    std::vector<char> _buffer = {};
    size_t _unassembled_byte = 0;
    size_t _head_index = 0;
    bool _eof_flag = false;
    ByteStream _output;  //!< The reassembled in-order byte stream
    size_t _capacity;    //!< The maximum number of bytes

    //! merge elm2 to elm1, return merged bytes
    long merge_block(block_node &elm1, const block_node &elm2);
    //......

stream_reassembler.cc

StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) {
    _buffer.resize(capacity);
}

long StreamReassembler::merge_block(block_node &elm1, const block_node &elm2) {
    block_node x, y;
    if (elm1.begin > elm2.begin) {
        x = elm2;
        y = elm1;
    } else {
        x = elm1;
        y = elm2;
    }
    if (x.begin + x.length < y.begin) {
        return -1;  // no intersection, couldn't merge
    } else if (x.begin + x.length >= y.begin + y.length) {
        elm1 = x;
        return y.length;
    } else {
        elm1.begin = x.begin;
        elm1.data = x.data + y.data.substr(x.begin + x.length - y.begin);
        elm1.length = elm1.data.length();
        return x.begin + x.length - y.begin;
    }
}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    if (index >= _head_index + _capacity) {  // capacity over
        return;
    }

    // handle extra substring prefix
    block_node elm;
    if (index + data.length() <= _head_index) {  // couldn't equal, because there have emtpy substring
        goto JUDGE_EOF;
    } else if (index < _head_index) {
        size_t offset = _head_index - index;
        elm.data.assign(data.begin() + offset, data.end());
        elm.begin = index + offset;
        elm.length = elm.data.length();
    } else {
        elm.begin = index;
        elm.length = data.length();
        elm.data = data;
    }
    _unassembled_byte += elm.length;

    // merge substring
    do {
        // merge next
        long merged_bytes = 0;
        auto iter = _blocks.lower_bound(elm);
        while (iter != _blocks.end() && (merged_bytes = merge_block(elm, *iter)) >= 0) {
            _unassembled_byte -= merged_bytes;
            _blocks.erase(iter);
            iter = _blocks.lower_bound(elm);
        }
        // merge prev
        if (iter == _blocks.begin()) {
            break;
        }
        iter--;
        while ((merged_bytes = merge_block(elm, *iter)) >= 0) {
            _unassembled_byte -= merged_bytes;
            _blocks.erase(iter);
            iter = _blocks.lower_bound(elm);
            if (iter == _blocks.begin()) {
                break;
            }
            iter--;
        }
    } while (false);
    _blocks.insert(elm);

    // write to ByteStream
    if (!_blocks.empty() && _blocks.begin()->begin == _head_index) {
        const block_node head_block = *_blocks.begin();
        // modify _head_index and _unassembled_byte according to successful write to _output
        size_t write_bytes = _output.write(head_block.data);
        _head_index += write_bytes;
        _unassembled_byte -= write_bytes;
        _blocks.erase(_blocks.begin());
    }

JUDGE_EOF:
    if (eof) {
        _eof_flag = true;
    }
    if (_eof_flag && empty()) {
        _output.end_input();
    }
}

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_byte; }

bool StreamReassembler::empty() const { return _unassembled_byte == 0; }

LAB2

Sequence Numbers

要求實現序列號、絕對序列號與流索引間的轉換。照著講義的表格寫就行:

Sequence Numbers Absolute Sequence Numbers Stream Indices
Start at the ISN Start at 0 Start at 0
Include SYN/FIN Include SYN/FIN Omit SYN/FIN
32 bits, wrapping 64 bits, non-wrapping 64 bits, non-wrapping
“seqno” “absolute seqno” “stream index”

需要提一下的地方有checkpoint表示最近一次轉換求得的absolute seqno,而本次轉換出的absolute seqno應該選擇與上次值最為接近的那一個。原理是雖然segment不一定按序到達,但幾乎不可能出現相鄰到達的兩個segment序號差值超過INT32_MAX的情況,除非延遲以年為單位,或者產生了位元差錯(後面的LAB可能涉及)。

實際操作就是把算出來的 絕對序號分別加減1ul << 32做比較,選擇與checkpoing差的絕對值最小的那個。

wrapping_integers.cc

WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) { 
    return WrappingInt32(static_cast<uint32_t>(n) + isn.raw_value()); 
}

uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
    uint32_t offset = n.raw_value() - isn.raw_value();
    uint64_t t = (checkpoint & 0xFFFFFFFF00000000) + offset;
    uint64_t ret = t;
    if (abs(int64_t(t + (1ul << 32) - checkpoint)) < abs(int64_t(t - checkpoint)))
        ret = t + (1ul << 32);
    if (t >= (1ul << 32) && abs(int64_t(t - (1ul << 32) - checkpoint)) < abs(int64_t(ret - checkpoint)))
        ret = t - (1ul << 32);
    return ret;
}

Implementing the TCP receiver

要求實現一個基於滑動視窗的TCP接收端,類似於我在《計算機網路:自定向下方法》裡看的選擇重傳協議,不過多了點小細節。上個圖:

整個接收端的空間由視窗空間(基於StreamReassmbler)和緩衝區空間(基於ByteStream)兩部分共享。需要注意視窗長度等於接收端容量減去還留在緩衝區的位元組數,只有當位元組從緩衝區讀出後窗口長度才能縮減。

這節的資料結構部分在前兩個LAB已經實現完了,我們只要寫點業務邏輯就行,就是瑣碎的細節多了點,需要瘋狂除錯瘋狂修改。一些容易出問題的細節我都標註在程式碼裡了。

為了達成本節任務,我在StreamReassembler里加了倆getter介面:

stream_reassembler.hh

class StreamReassembler {
    public:
	size_t head_index() const { return _head_index; }
    bool input_ended() const { return _output.input_ended(); }
    //......
}

tcp_receiver.hh

class TCPReceiver {
  private:
    //! Our data structure for re-assembling bytes.
    StreamReassembler _reassembler;
    bool _syn_flag = false;
    bool _fin_flag = false;
    size_t _base = 0;  // when unintital, equal zero for ackno special judge
    size_t _isn = 0;
    //! The maximum number of bytes we'll store.
    size_t _capacity;
    //......

tcp_receiver.cc

bool TCPReceiver::segment_received(const TCPSegment &seg) {
    bool ret = false;
    static size_t abs_seqno = 0;
    size_t length;
    if (seg.header().syn) {
        if (_syn_flag) {  // already get a SYN, refuse other SYN.
            return false;
        }
        _syn_flag = true;
        ret = true;
        _isn = seg.header().seqno.raw_value();
        abs_seqno = 1;
        _base = 1;
        length = seg.length_in_sequence_space() - 1;
        if (length == 0) {  // segment's content only have a SYN flag
            return true;
        }
    } else if (!_syn_flag) {  // before get a SYN, refuse any segment
        return false;
    } else {  // not a SYN segment, compute it's abs_seqno
        abs_seqno = unwrap(WrappingInt32(seg.header().seqno.raw_value()), WrappingInt32(_isn), abs_seqno);
        length = seg.length_in_sequence_space();
    }

    if (seg.header().fin) {
        if (_fin_flag) {  // already get a FIN, refuse other FIN
            return false;
        }
        _fin_flag = true;
        ret = true;
    }
    // not FIN and not one size's SYN, check border
    else if (seg.length_in_sequence_space() == 0 && abs_seqno == _base) {
        return true;
    } else if (abs_seqno >= _base + window_size() || abs_seqno + length <= _base) {
        if (!ret)
            return false;
    }

    _reassembler.push_substring(seg.payload().copy(), abs_seqno - 1, seg.header().fin);
    _base = _reassembler.head_index() + 1;
    if (_reassembler.input_ended())  // FIN be count as one byte
        _base++;
    return true;
}

optional<WrappingInt32> TCPReceiver::ackno() const {
    if (_base > 0)
        return WrappingInt32(wrap(_base, WrappingInt32(_isn)));
    else
        return std::nullopt;
}

size_t TCPReceiver::window_size() const { return _capacity - _reassembler.stream_out().buffer_size(); }

LAB3

這節課實現的是TCP的傳送方,難度比上節要高一些,看完講義後我一頭霧水,不知如何下手。到最後抄了huangrt01兄的作業才算是理明白了。以下記錄下我迷惑的幾個點:

第一個迷惑點是書上描述選擇重傳的這張圖:

這副圖裡用的是“分別確認”的協議,即傳送方收到一個ackno只代表接收方收到了該ackno對應的那個段。而我們在這個lab用的是基於“累計確認”的ARQ協議,即傳送方收到一個ackno代表了接收方已經收到ackno之前的所有段。原先的時候我花了大部分的時間思考怎麼設計分別確認的資料結構,又怎樣來跟蹤重傳的段。換成累計確認就簡單多了,直接一個queue就行,重傳的時候只傳頭就行。

第二個迷惑點是接收方發出的首個段應是隻包含SYN的段,用作第一次握手。課本里是這麼講的,但講義裡沒說,所以我就按首個段也可以存資料的邏輯寫了不少錯程式碼。

第三個迷惑點是計時器啟動的時機。講義裡在多個地方都提到了重啟計時器,搞得我很混亂。這裡推薦課本里的圖3-33:

下面是最終程式碼

tcp_sender.hh

class TCPSender{
    private:
    //......
    
    std::queue<TCPSegment> _segments_outstanding{};
    size_t _bytes_in_flight = 0;
    size_t _recv_ackno = 0;
    bool _syn_flag = false;
    bool _fin_flag = false;
    size_t _window_size = 0;

    size_t _timer = 0;
    bool _timer_running = false;
    size_t _retransmission_timeout = 0;
    size_t _consecutive_retransmission = 0;

    void send_segment(TCPSegment &seg);
    //......

tcp_sender.cc

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _retransmission_timeout(retx_timeout) {}

uint64_t TCPSender::bytes_in_flight() const { return _bytes_in_flight; }

void TCPSender::fill_window(bool send_syn) {
    // sent a SYN before sent other segment
    if (!_syn_flag) {
        if (send_syn) {
            TCPSegment seg;
            seg.header().syn = true;
            send_segment(seg);
            _syn_flag = true;
        }
        return;
    }

    // take window_size as 1 when it equal 0
    size_t win = _window_size > 0 ? _window_size : 1;
    size_t remain;  // window's free space
    // when window isn't full and never sent FIN
    while ((remain = win - (_next_seqno - _recv_ackno)) != 0 && !_fin_flag) {
        size_t size = min(TCPConfig::MAX_PAYLOAD_SIZE, remain);
        TCPSegment seg;
        string str = _stream.read(size);
        seg.payload() = Buffer(std::move(str));
        // add FIN
        if (seg.length_in_sequence_space() < win && _stream.eof()) {
            seg.header().fin = true;
            _fin_flag = true;
        }
        // stream is empty
        if (seg.length_in_sequence_space() == 0) {
            return;
        }
        send_segment(seg);
    }
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
//! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet)
bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    size_t abs_ackno = unwrap(ackno, _isn, _recv_ackno);
    // out of window, invalid ackno
    if (abs_ackno > _next_seqno) {
        return false;
    }

    // if ackno is legal, modify _window_size before return
    _window_size = window_size;

    // ack has been received
    if (abs_ackno <= _recv_ackno) {
        return true;
    }
    _recv_ackno = abs_ackno;

    // pop all elment before ackno
    while (!_segments_outstanding.empty()) {
        TCPSegment seg = _segments_outstanding.front();
        if (unwrap(seg.header().seqno, _isn, _next_seqno) + seg.length_in_sequence_space() <= abs_ackno) {
            _bytes_in_flight -= seg.length_in_sequence_space();
            _segments_outstanding.pop();
        } else {
            break;
        }
    }

    fill_window();

    _retransmission_timeout = _initial_retransmission_timeout;
    _consecutive_retransmission = 0;

    // if have other outstanding segment, restart timer
    if (!_segments_outstanding.empty()) {
        _timer_running = true;
        _timer = 0;
    }
    return true;
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
    _timer += ms_since_last_tick;
    if (_timer >= _retransmission_timeout && !_segments_outstanding.empty()) {
        _segments_out.push(_segments_outstanding.front());
        _consecutive_retransmission++;
        _retransmission_timeout *= 2;
        _timer_running = true;
        _timer = 0;
    }
    if (_segments_outstanding.empty()) {
        _timer_running = false;
    }
}

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmission; }

void TCPSender::send_empty_segment() {
    // empty segment doesn't need store to outstanding queue
    TCPSegment seg;
    seg.header().seqno = wrap(_next_seqno, _isn);
    _segments_out.push(seg);
}

void TCPSender::send_empty_segment(WrappingInt32 seqno) {
    // empty segment doesn't need store to outstanding queue
    TCPSegment seg;
    seg.header().seqno = seqno;
    _segments_out.push(seg);
}

void TCPSender::send_segment(TCPSegment &seg) {
    seg.header().seqno = wrap(_next_seqno, _isn);
    _next_seqno += seg.length_in_sequence_space();
    _bytes_in_flight += seg.length_in_sequence_space();
    _segments_outstanding.push(seg);
    _segments_out.push(seg);
    if (!_timer_running) {  // start timer
        _timer_running = true;
        _timer = 0;
    }
}

LAB4

這節課要求實現一個TCPConnection類,主要功能有:封裝TCPSenderTCPReceiver;構建TCP的有限狀態機(FSM)。雖然講義裡說這節課不需要設計啥新東西,只要拼拼湊湊就行,但實際實現難度比前四個實驗加起來還難。主要難點在於TCP的FSM涉及到12種狀態間的轉換,需要很多的細節邏輯來控制。並且老師說因為不想讓大家“面向樣例程式設計”,所以LAB4之前的幾個LAB的測試樣例並非“全面”的,到了LAB4再給你套“完備”的測試,這就導致很多前幾個LAB潛在的BUG都集中在LAB4裡爆發出來,需要修改前面實現過的程式碼才行。

思路分析也實在沒啥好說的了,去網上狂啃資料把TCP的FSM吃透,把講義完全吃透,把官網FAQ裡的東西吃透,把libsponge/tcp_helpers/tcp_state.cc吃透。然後瘋狂的改BUG吧。

我敢肯定你大把的時間最後都會花在研究下面這三幅圖上的:

雖然不想講實現思路,但在編寫過程中有幾個惱人的環境問題我還是要提一下:

tun.cc編譯錯誤

如果你編譯的時候爆出瞭如下錯誤:

[  5%] Building CXX object libsponge/CMakeFiles/sponge.dir/util/tun.cc.o
In file included from /home/kang/sponge/libsponge/util/tun.cc:8:
/usr/include/linux/if.h:211:19: error: field ‘ifru_addr’ has incomplete type ‘sockaddr’
   struct sockaddr ifru_addr;
                   ^~~~~~~~~
In file included from /usr/include/linux/if.h:23,
                 from /home/kang/sponge/libsponge/util/tun.cc:8:
/usr/include/linux/socket.h:19:27: note: forward declaration of ‘struct sockaddr’
 } __attribute__ ((aligned(_K_SS_ALIGNSIZE))); /* force desired alignment */
                           ^~~~~~~~~~~~~~~

請參考interfaces code fails with incomplete type 'struct sockaddr',在libsponge/util/tun.cc中新增

#include <sys/socket.h>

make check超時或隨機幾個樣例報錯

天朝特色網路問題,因為網站伺服器在國外,所以多重試幾次吧。我本地執行出錯概率比較大,換用阿里的雲主機處出錯概率就比較小了。

最終程式碼如下:

tcp_connection.hh

class TCPConnection {
  private:   
    //......
    
    size_t _time_since_last_segment_received = 0;
    bool _active = true;
    bool _need_send_rst = false;
    bool _ack_for_fin_sent = false;

    bool push_segments_out(bool send_syn = false);
    void unclean_shutdown(bool send_rst);
    bool clean_shutdown();
    bool in_listen();
    bool in_syn_recv();
    bool in_syn_sent();
    //.......

tcp_connection.cc

size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _time_since_last_segment_received; }

void TCPConnection::segment_received(const TCPSegment &seg) {
    if (!_active)
        return;
    _time_since_last_segment_received = 0;

    // data segments with acceptable ACKs should be ignored in SYN_SENT
    if (in_syn_sent() && seg.header().ack && seg.payload().size() > 0) {
        return;
    }
    bool send_empty = false;
    if (_sender.next_seqno_absolute() > 0 && seg.header().ack) {
        // unacceptable ACKs should produced a segment that existed
        if (!_sender.ack_received(seg.header().ackno, seg.header().win)) {
            send_empty = true;
        }
    }

    bool recv_flag = _receiver.segment_received(seg);
    if (!recv_flag) {
        send_empty = true;
    }

    if (seg.header().syn && _sender.next_seqno_absolute() == 0) {
        connect();
        return;
    }

    if (seg.header().rst) {
        // RST segments without ACKs should be ignored in SYN_SENT
        if (in_syn_sent() && !seg.header().ack) {
            return;
        }
        unclean_shutdown(false);
        return;
    }

    if (seg.length_in_sequence_space() > 0) {
        send_empty = true;
    }

    if (send_empty) {
        if (_receiver.ackno().has_value() && _sender.segments_out().empty()) {
            _sender.send_empty_segment();
        }
    }
    push_segments_out();
}

bool TCPConnection::active() const { return _active; }

size_t TCPConnection::write(const string &data) {
    size_t ret = _sender.stream_in().write(data);
    push_segments_out();
    return ret;
}

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
    if (!_active)
        return;
    _time_since_last_segment_received += ms_since_last_tick;
    _sender.tick(ms_since_last_tick);
    if (_sender.consecutive_retransmissions() > TCPConfig::MAX_RETX_ATTEMPTS) {
        unclean_shutdown(true);
    }
    push_segments_out();
}

void TCPConnection::end_input_stream() {
    _sender.stream_in().end_input();
    push_segments_out();
}

void TCPConnection::connect() {
    // when connect, must active send a SYN
    push_segments_out(true);
}

TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            // Your code here: need to send a RST segment to the peer
            cerr << "Warning: Unclean shutdown of TCPConnection\n";
            unclean_shutdown(true);
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
    }
}

bool TCPConnection::push_segments_out(bool send_syn) {
    // default not send syn before recv a SYN
    _sender.fill_window(send_syn || in_syn_recv());
    TCPSegment seg;
    while (!_sender.segments_out().empty()) {
        seg = _sender.segments_out().front();
        _sender.segments_out().pop();
        if (_receiver.ackno().has_value()) {
            seg.header().ack = true;
            seg.header().ackno = _receiver.ackno().value();
            seg.header().win = _receiver.window_size();
        }
        if (_need_send_rst) {
            _need_send_rst = false;
            seg.header().rst = true;
        }
        _segments_out.push(seg);
    }
    clean_shutdown();
    return true;
}

void TCPConnection::unclean_shutdown(bool send_rst) {
    _receiver.stream_out().set_error();
    _sender.stream_in().set_error();
    _active = false;
    if (send_rst) {
        _need_send_rst = true;
        if (_sender.segments_out().empty()) {
            _sender.send_empty_segment();
        }
        push_segments_out();
    }
}

bool TCPConnection::clean_shutdown() {
    if (_receiver.stream_out().input_ended() && !(_sender.stream_in().eof())) {
        _linger_after_streams_finish = false;
    }
    if (_sender.stream_in().eof() && _sender.bytes_in_flight() == 0 && _receiver.stream_out().input_ended()) {
        if (!_linger_after_streams_finish || time_since_last_segment_received() >= 10 * _cfg.rt_timeout) {
            _active = false;
        }
    }
    return !_active;
}

bool TCPConnection::in_listen() { return !_receiver.ackno().has_value() && _sender.next_seqno_absolute() == 0; }

bool TCPConnection::in_syn_recv() { return _receiver.ackno().has_value() && !_receiver.stream_out().input_ended(); }

bool TCPConnection::in_syn_sent() {
    return _sender.next_seqno_absolute() > 0 && _sender.bytes_in_flight() == _sender.next_seqno_absolute();
}

通關截圖:

效能測試結果:

本地的WSL,Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz,Ubuntu 18.04.5 LTS

kang@DICE-KNIGHT:~/sponge/build$ ./apps/tcp_benchmark
CPU-limited throughput                : 0.67 Gbit/s
CPU-limited throughput with reordering: 0.52 Gbit/s

阿里雲主機,Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz,CentOS Linux release 7.8.2003 (Core)

[kang@toy build]$ ./apps/tcp_benchmark
CPU-limited throughput                : 0.63 Gbit/s
CPU-limited throughput with reordering: 0.53 Gbit/s

已經大大超過了講義要去的最低效能0.1Gbit/s,所以可以宣告完工了!

瓶頸優化

別以為這樣就結束了!

雖然講義要求最低達到0.1Gbit/s即可,可講義裡的示例可是我們效率的數倍啊!

user@computer: ~ /sponge/build$ ./apps/tcp benchmark
CPU-limited throughput : 1.78 Gbit/s
CPU-limited throughput with reordering: 1.21 Gbit/s

為了挑戰這一數值我們需要利用之前看過的《深入理解計算機系統》中效能優化的知識。

第一步,查詢效能瓶頸

首先修改sponge/etc/cflags.cmake中的編譯引數,將-g改為-Og -pg,使生成的程式具有分析程式可用的連結資訊。

然後編譯並執行一遍benchmark程式,並將分析結果寫入文字中

make -j8
./apps/tcp_benchmark
gprof ./apps/tcp_benchmark > prof.txt
vim prof.txt

可以看到write,pop,peek三個操作公共佔據了80%左右的執行時間,所以我們主要對ByteStream類進行優化。

做這裡我去狂補了一通std::move,assign,右值引用等C++11新特性,然後把sponge/libsponge/util/buffer.cc的程式碼翻了個底朝天,終於領悟了原來是要用這裡面 BufferList類來作為ByteStream的容器,使得原來基於記憶體拷貝的儲存方法變為基於記憶體所有權轉移的儲存方法。

比較尷尬的是雖然給了BufferList的程式碼,但沒有示例,使用方法只能多看看原始碼了。

最終程式碼:

byte_stream.hh

#include "util/buffer.hh"
class ByteStream {
  private:
	BufferList _buffer = {};
    
    //......

byte_stream.cc

//......

size_t ByteStream::write(const string &data) {
    size_t len = data.length();
    if (len > _capacity - _buffer.size()) {
        len = _capacity - _buffer.size();
    }
    _write_count += len;
    _buffer.append(BufferList(move(string().assign(data.begin(),data.begin()+len))));
    return len;
}

//! \param[in] len bytes will be copied from the output side of the buffer
string ByteStream::peek_output(const size_t len) const {
    size_t length = len;
    if (length > _buffer.size()) {
        length = _buffer.size();
    }
    string s=_buffer.concatenate();
    return string().assign(s.begin(), s.begin() + length);
}

//! \param[in] len bytes will be removed from the output side of the buffer
void ByteStream::pop_output(const size_t len) {
    size_t length = len;
    if (length > _buffer.size()) {
        length = _buffer.size();
    }
    _read_count += length;
    _buffer.remove_prefix(length);
    return;
}

//......

最終成績:

本地的WSL,Intel(R) Core(TM) i5-6200U CPU @ 2.30GHz,Ubuntu 18.04.5 LTS

kang@DICE-KNIGHT:~/sponge/build$ ./apps/tcp_benchmark
CPU-limited throughput                : 1.84 Gbit/s
CPU-limited throughput with reordering: 0.64 Gbit/s

阿里雲主機,Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz,CentOS Linux release 7.8.2003 (Core)

[kang@toy build]$ ./apps/tcp_benchmark
CPU-limited throughput                : 1.83 Gbit/s
CPU-limited throughput with reordering: 1.14 Gbit/s

可以看到經過優化後的程式碼在CentOS的阿里雲主機上的效能已經達到了講義所給示例的水平。但本地的reordering能力比雲主機差半截,一開始懷疑是WSL的問題,後來我換了幾臺Ubuntu系統的電腦做測試,reordering始終不高,可能是Ubuntu特性吧。

不論怎樣,停停走走歷時二十多天,我們最終實現了一個能用的TCP協議,值得歡呼了。