1. 程式人生 > >Golang 編寫 Tcp 伺服器

Golang 編寫 Tcp 伺服器

Golang 作為廣泛用於服務端和雲端計算領域的程式語言,tcp socket 是其中至關重要的功能。無論是 WEB 伺服器還是各類中介軟體都離不開 tcp socket 的支援。

  • Echo 伺服器
  • 拆包與粘包
  • 優雅關閉

與早期的每個執行緒持有一個 socket 的 block IO 模型不同, 多路IO複用模型使用單個執行緒監聽多個 socket, 當某個 socket 準備好資料後再進行響應。在邏輯上與使用 select 語句監聽多個 channel 的模式相同。

目前主要的多路IO複用實現主要包括: SELECT, POLL 和 EPOLL。 為了提高開發效率社群也出現很多封裝庫, 如Netty(Java), Tornado(Python) 和 libev(C)等。

Golang Runtime 封裝了各作業系統平臺上的多路IO複用介面, 並允許使用 goroutine 快速開發高效能的 tcp 伺服器。

Echo 伺服器

作為開始,我們來實現一個簡單的 Echo 伺服器。它會接受客戶端連線並將客戶端傳送的內容原樣傳回客戶端。

package main

import (
    "fmt"
    "net"
    "io"
    "log"
    "bufio"
)

func ListenAndServe(address string) {
    // 繫結監聽地址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的連線建立或者listen中斷才會返回
        conn, err := listener.Accept()
        if err != nil {
            // 通常是由於listener被關閉無法繼續監聽導致的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啟新的 goroutine 處理該連線
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    // 使用 bufio 標準庫提供的緩衝區功能
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 '\n'
        // 遇到分隔符後會返回上次遇到分隔符或連線建立後收到的所有資料, 包括分隔符本身
        // 若在遇到分隔符之前遇到異常, ReadString 會返回已收到的資料和錯誤資訊
        msg, err := reader.ReadString('\n')
        if err != nil {
            // 通常遇到的錯誤是連線中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 將收到的資訊傳送給客戶端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}

使用 telnet 工具測試我們編寫的 Echo 伺服器:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
> a
a
> b
b
Connection closed by foreign host.

拆包與粘包

HTTP 等應用層協議只有收到一條完整的訊息後才能進行處理,而工作在傳輸層的TCP協議並不瞭解應用層訊息的結構。

因此,可能遇到一條應用層訊息分為兩個TCP包傳送或者一個TCP包中含有兩條應用層訊息片段的情況,前者稱為拆包後者稱為粘包。

在 Echo 伺服器的示例中,我們定義用\n表示訊息結束。我們可能遇到下列幾種情況:

  1. 收到兩個 tcp 包: "abc", "def\n", 應發出一條響應 "abcdef\n", 這是拆包的情況
  2. 收到一個 tcp 包: "abc\ndef\n", 應發出兩條響應 "abc\n", "def\n", 這是粘包的情況

當我們使用 tcp socket 開發應用層程式時必須正確處理拆包和粘包。

bufio 標準庫會快取收到的資料直到遇到分隔符才會返回,它可以正確處理拆包和粘包。

上層協議通常採用下列幾種思路之一來定義訊息,以保證完整地進行讀取:

  • 定長訊息
  • 在訊息尾部新增特殊分隔符,如示例中的Echo協議和FTP控制協議
  • 將訊息分為header 和 body, 並在 header 提供訊息總長度。這是應用最廣泛的策略,如HTTP協議。

優雅關閉

在生產環境下需要保證TCP伺服器關閉前完成必要的清理工作,包括將完成正在進行的資料傳輸,關閉TCP連線等。這種關閉模式稱為優雅關閉,可以避免資源洩露以及客戶端未收到完整資料造成異常。

TCP 伺服器的優雅關閉模式通常為: 先關閉listener阻止新連線進入,然後遍歷所有連線逐個進行關閉。

本節完整原始碼地址: https://github.com/HDT3213/godis/tree/master/src/server

首先修改一下TCP伺服器:

// handler 是應用層伺服器的抽象
type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close()error
}

func ListenAndServe(cfg *Config, handler tcp.Handler) {
    listener, err := net.Listen("tcp", cfg.Address)
    if err != nil {
        logger.Fatal(fmt.Sprintf("listen err: %v", err))
    }

    // 監聽中斷訊號
    // atomic.AtomicBool 是作者寫的封裝: https://github.com/HDT3213/godis/blob/master/src/lib/sync/atomic/bool.go
    var closing atomic.AtomicBool 
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
        sig := <-sigCh
        switch sig {
        case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
            // 收到中斷訊號後開始關閉流程
            logger.Info("shuting down...")
            // 設定標誌位為關閉中, 使用原子操作保證執行緒可見性
            closing.Set(true)
            // listener 關閉後 listener.Accept() 會立即返回錯誤
            listener.Close() 
        }
    }()


    logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
    // 在出現未知錯誤或panic後保證正常關閉
    // 注意defer順序,先關閉 listener 再關閉應用層伺服器 handler
    defer handler.Close()
    defer listener.Close()
    ctx, _ := context.WithCancel(context.Background())
    for {
        conn, err := listener.Accept()
        if err != nil {
            if closing.Get() {
                // 收到關閉訊號後進入此流程,此時listener已被監聽系統訊號的 goroutine 關閉
                // handler 會被上文的 defer 語句關閉直接返回
                return 
            }
            logger.Error(fmt.Sprintf("accept err: %v", err))
            continue
        }
        // handle
        logger.Info("accept link")
        go handler.Handle(ctx, conn)
    }
}

接下來修改應用層伺服器:

// 客戶端連線的抽象
type Client struct {
    // tcp 連線
    Conn net.Conn
    // 當服務端開始傳送資料時進入waiting, 阻止其它goroutine關閉連線
    // wait.Wait是作者編寫的帶有最大等待時間的封裝: 
    // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
    Waiting wait.Wait
}

type EchoHandler struct {
    
    // 儲存所有工作狀態client的集合(把map當set用)
    // 需使用併發安全的容器
    activeConn sync.Map 

    // 和 tcp server 中作用相同的關閉狀態標識位
    closing atomic.AtomicBool
}

func MakeEchoHandler()(*EchoHandler) {
    return &EchoHandler{
    }
}

// 關閉客戶端連線
func (c *Client)Close()error {
    // 等待資料傳送完成或超時
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
}

func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // closing handler refuse new connection
        conn.Close()
    }

    client := &Client {
        Conn: conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    for {
        msg, err := reader.ReadString('\n')
        if err != nil {
            if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(conn)
            } else {
                logger.Warn(err)
            }
            return
        }
        // 傳送資料前先置為waiting狀態
        client.Waiting.Add(1)

        // 模擬關閉時未完成傳送的情況
        //logger.Info("sleeping")
        //time.Sleep(10 * time.Second)

        b := []byte(msg)
        conn.Write(b)
        // 傳送完畢, 結束waiting
        client.Waiting.Done()
    }
}

func (h *EchoHandler)Close()error {
    logger.Info("handler shuting down...")
    h.closing.Set(true)
    // TODO: concurrent wait
    h.activeConn.Range(func(key interface{}, val interface{})bool {
        client := key.(*Client)
        client.Close()
        return true
    })
    return nil
}