1. 程式人生 > >小白的實時通訊之路

小白的實時通訊之路

1. 定義

1.1 背景介紹

通訊: 客戶端和服務端的一次互動過程。可以簡述為如下步驟:

  • 客戶端發出請求

  • 服務端接收,處理,返回結果

  • 客戶端接收結果

實時通訊:所謂「實時通訊」就是要求客戶端能夠收到服務端實時更新的結果,其實簡單理解就是在通訊這個詞上加了「實時的」這個形容詞,所以本質上通訊的雙方還是客戶端和服務端。舉個生活中實時通訊的例子,約會中,為了實時獲取到對方的位置,可能採用以下的方式,此處以 A 代表約會發起方(客戶端),B 代表約會接受方 (服務端)(純屬舉例):

  • A 每隔 3 分鐘給 B 播一通電話,確認 B 的位置,然後結束通話電話——polling

  • A 給 B 撥通電話後,保持著電話連線狀態,等 B 的位置向目的移動 5 公里以後給 A 回覆,然後結束通話電話——long polling

  • A 給 B 撥通電話後,始終保持著電話連線狀態,A 每隔 3 分鐘給問一次 B 的位置,B 收到請求前進 5 公里後給就給 A 一個回覆——long connection

  • A 給 B 撥通電話後,始終保持著電話連線狀態,同時雙方每前進 5 公斤就給對方一個回覆——websocket

1.2 概念介紹

polling: 客戶端每隔一段時間向服務端傳送請求來獲取資料。

long polling: 長輪詢是在建立連線以後保持,等待資料更新服務端推送更新資料再關閉。

  • 長輪詢必須使用長連結
  • 長輪詢是一種服務端推送資料的技術
  • 長輪詢實現服務端推資料的方式是 hold 住一個請求(建議設定超時時間),直到有資料更新則給客戶端響應。再次強調,長輪詢 hold 住的應該是一個 request 請求,不是 hold 住一個連線

long connection: 在連線建立以後,客戶端和服務端保持該連線直至有一方主動關閉連線,實質上保持該通道,實現多路複用。

  • 長連線描述的是 TCP 連線,允許複用該 TCP 連線上發起多次請求,使用長連結後並不是一次請求建立一個 TCP連線

websocket: 一種在單個 TCP 連線上進行全雙工通訊的協議,允許雙向資料傳輸,使得客戶端和服務端之間的資料交換變得更加簡單。

  • http 協議是無狀態的,客戶端發起一次 request, 服務端返回一次 response,這屬於被動式的「單工」通訊,但 websocket 是全雙工的,既允許客戶端向服務端發起請求,也允許服務端向客戶端推送資料
  • websocket 的出現是為了在應用層實現類似傳輸層的 TCP 協議

1.3 優缺點對比

優點 缺點 適用場景
polling 編寫簡單 頻繁建立、關閉連線,效率低下,浪費頻寬和伺服器資源
long polling 無更新的情況不會頻繁請求,減少連線建立的資源消耗 後端需單獨維護掛起連線,掛起的連線會消耗後端資源
long connection 多個請求複用同一連線,可用於實現訊息實時推送 後端需單獨維護長連結,消耗後端資源 直播,流媒體
web socket 真正意義上的長連結且全雙工 改造略複雜,前後端均需做支援
  • 從實現複雜程度上考慮, polling > long polling > long connection > websocket
  • 從效能方面考慮, websocket > long connection > long polling > polling

2. 實現

為了防止出現「懂了這麼多理論,還是寫不出程式碼」的情況出現,下面會開始介紹四種例子的實現,為了避免使用 hello world,筆者特意抽取日劇《非自然死亡》裡的經典臺詞做為傳輸內容。

2.1 polling 和 long polling

上述兩種方式的實現程式碼非常簡單,筆者這裡不做介紹,但會對上面的理論知識做驗證。

驗證:每次的請求是否會導致底層連線頻繁建立

  • 使用 net/http 建立 client,每間隔一秒輪詢一次服務端資料,資料截圖:

    • 通訊內容截圖:
      在這裡插入圖片描述

    注:通訊內容截圖沒什麼意義,單純展示下《非自然死亡》經典臺詞。

    • tcpdump 抓包截圖:
      在這裡插入圖片描述

    注:tcpdump 抓包截圖是在通訊過程中,對服務端 9162 埠抓包截圖,可以看出並沒有出現理論知識中的頻繁建立連線的過程。由於筆者用的 DefaultClient 而它底層用了 DefaultTransport,所以原因在這裡:

    DefaultTransport is the default implementation of Transport and is used by DefaultClient. It establishes network connections as needed and caches them for reuse by subsequent calls. It uses HTTP proxies as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and $no_proxy) environment variables.

    var DefaultTransport RoundTripper = &Transport{
            Proxy: ProxyFromEnvironment,
            DialContext: (&net.Dialer{
                    Timeout:   30 * time.Second,
                    KeepAlive: 30 * time.Second,
                    DualStack: true,
            }).DialContext,
            MaxIdleConns:          100,
            IdleConnTimeout:       90 * time.Second,
            TLSHandshakeTimeout:   10 * time.Second,
            ExpectContinueTimeout: 1 * time.Second,
    }
    
  • 改用瀏覽器手動呼叫輪詢介面,tcpdump 截圖如下:
    在這裡插入圖片描述

    注:確實瀏覽器手動呼叫發生了頻繁的連線建立與銷燬

結論: 如果使用 net/http 庫建立客戶端輪詢獲取服務端更新,並不會引起連線頻繁建立和銷燬,具體底層連線的保持引數可以調整。

2.2 long connection

長連結的程式碼實現,主要有兩個需要注意的地方,一個服務端在傳送資料後需要重新整理快取,另一個是客戶端如何解析服務端傳送的資料。

服務端重新整理快取:

type flushWriter struct {
	f http.Flusher
	w io.Writer
}

func (fw flushWriter) Write(p []byte) (n int, err error) {
	n, err = fw.w.Write(p)
	if fw.f != nil {
		fw.f.Flush()
	}
	return
}
func longConnection(w http.ResponseWriter, r *http.Request) {
	fw := flushWriter{w: w}
	if f, ok := w.(http.Flusher); ok {
		fw.f = f
	}
	var (
		cqtSize = len(classicQuoteTable) //[]string 型別,儲存經典語錄內容
		c       = make(chan classicQuote, 1)
		index   uint8
	)
	go func(c chan classicQuote) {
		for {
			time.Sleep(1 * time.Second)
			c <- classicQuote{
				Content: classicQuoteTable[int(index)%cqtSize],
			}
			index++
		}

	}(c)
	for {
		select {
		case r := <-c:
			br, err := json.Marshal(r)
			if err != nil {
				fmt.Println("longConnection marshal failed", err)
				break
			}
			fw.Write(br)
		}
	}
}

注:重新整理快取Golang 實現不帶緩衝的 http.ResponseWritter介紹的比較詳細,筆者此處不做過多介紹。

客戶端讀取資料

	client := &http.Client{
		Transport: &http.Transport{
			DialContext: (&net.Dialer{
				Timeout:   30 * time.Second,
				KeepAlive: 30 * time.Second,
			}).DialContext,
		},
	}

	for {
		rqst, err := http.NewRequest("GET", "http://127.0.0.1:9162/long_connection", nil)
		if err != nil {
			fmt.Println(err)
			return
		}
		resp, err := client.Do(rqst)
		if err != nil {
			fmt.Println(err)
			return
		}
		defer resp.Body.Close()
        //流式從 http response body 讀取服務端返回資料
		decoder := json.NewDecoder(resp.Body)
		cq := new(classicQuote)
		err = decoder.Decode(cq)   
		for err == nil {
			fmt.Printf("%v\n", cq)
			cq = new(classicQuote)
			err = decoder.Decode(cq)
		}
		fmt.Println("err", err)
	}

2.3 websocket

已經存在很多優秀的 websocket 庫,所以實現起來不復雜。golang.org/x/netgithub.com/gorilla 這兩個庫對比下來,後者的優勢更明顯,實現更多 RFC 6455 規範支援的特性,此處展示筆者使用後者庫的例子。

服務端

收到客戶端查詢經典語錄的訊息後,隨機選擇一條返回給客戶端。

type rqstMsg struct {
	Action string `json:"Query"`
}

var upgrader = websocket.Upgrader{}

func websocketHandle(w http.ResponseWriter, r *http.Request) {
	c, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		fmt.Printf("upgrader web socket failed, err %v\n", err)
		return
	}
	defer c.Close()
	var (
		index      int
		transferCh = make(chan struct{}, 1)
	)
	go func() {
		for {
			var r rqstMsg
			err := c.ReadJSON(&r)
			if err != nil {
				fmt.Printf("read err %v\n", err)
				return
			}
			fmt.Printf("recv: %s\n", r)
			transferCh <- struct{}{}
		}
	}()
	for {
		select {
		case <-transferCh:
			index++
			err = c.WriteJSON(classicQuoteTable[index%len(classicQuoteTable)])
			if err != nil {
				fmt.Printf("write err %v\n", err)
				break
			}
		}
	}
}

客戶端

間隔 30s 輪詢一次服務端經典語錄的內容。

type rqstMsg struct {
	Action string `json:"Query"` //websocket 連線上傳送的訊息格式
}

func main() {
	u := url.URL{
		Scheme: "ws",
		Host:   "127.0.0.1:9162",
		Path:   "web_socket",
	}
	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) //建立 websocket 連線
	if err != nil {
		fmt.Printf("dail websocket server error %v\n", err)
		return
	}
	defer c.Close()
	var (
		interrupt = make(chan os.Signal, 1)
		done      = make(chan struct{})
	)
	go func() {
		defer close(done)
		for {
			var respMsg string
			err := c.ReadJSON(&respMsg)
			if err != nil {
				fmt.Printf("read message error %v", err)
				return
			}
			fmt.Printf("recv message %s\n", respMsg)
		}
	}()
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	c.SetWriteDeadline(time.Now().Add(60 * time.Second)) //設定讀取超時時間,超時關閉連線
	for {
		select {
		case <-done:
			return
		case <-ticker.C:
			err := c.WriteJSON(rqstMsg{Action: "test"})
			if err != nil {
				log.Printf("write err: %v", err)
				return
			}
		case <-interrupt:
			log.Println("interrupt")
			err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
			if err != nil {
				log.Println("write close", err)
				return
			}
			select {
			case <-done:
			case <-time.After(time.Second):
			}
			return
		}
	}
}

注:筆者認為使用 websocket 協議的關鍵是要準確定義在 websocket 連線上傳送的訊息格式。

3. 思考

至此,這篇文章已全部總結完成,以上均為筆者個人理解,如有錯誤歡迎指出。

4. 參考資料