1. 程式人生 > 程式設計 >使用Go基於WebSocket構建千萬級視訊直播彈幕系統的程式碼詳解

使用Go基於WebSocket構建千萬級視訊直播彈幕系統的程式碼詳解

(1)業務複雜度介紹

開門見山,假設一個直播間同時500W人線上,那麼1秒鐘1000條彈幕,那麼彈幕系統的推送頻率就是: 500W * 1000條/秒=50億條/秒 ,想想B站2019跨年晚會那次彈幕系統得是多麼的NB,況且一個大型網站不可能只有一個直播間!

使用Go基於WebSocket構建千萬級視訊直播彈幕系統的程式碼詳解

使用Go做WebSocket開發無非就是三種情況:

  • 使用Go原生自帶的庫,也就是 golang.org/x/net ,但是這個官方庫真是出了奇Bug多
  • 使用GitHub大佬 gorilla/websocket 庫,可以結合到某些Web開發框架,比如Gin、iris等,只要使用的框架式基於 golang.org/net 的,那麼這個庫就可以與這個框架結合
  • 手擼一個WebSocket框架

根據估算結果,彈幕推送量很大的時候,Linux核心將會出現瓶頸,因為Linux核心傳送TCP包的時候極限包傳送頻率是100W。因此可以將同一秒內的彈幕訊息合併為1條推送,減少網路小資料包的傳送,從而降低推送頻率。

彈幕系統需要維護線上的使用者長連線來實現定向推送到線上的使用者,通常是使用Hash字典結構,通常推送訊息就是遍歷線上用的Hash字典。在彈幕推送期間使用者在不斷的上下線,為了維護上線使用者,那麼就得不斷的修改Hash字典,不斷地進行鎖操作,使用者量過大導致鎖瓶頸。因此可以將整個Hash結構拆分為多個Hash結構,分別對多個Hash結構加不同的鎖,並且使用讀寫鎖替代互斥鎖。

通常伺服器與客戶端互動使用JSON結構,那麼需要不斷的編碼解碼JSON資料,這將會導致CPU瓶頸。將訊息先進行合併,然後進行編碼,最後輪詢Hash結構進行推送。

以上是單體架構存在的問題,為了支援更多的使用者負載,通常彈幕系統採用分散式架構,進行彈性擴容縮容。

(2)推送還是拉取?

如果是客戶端拉取伺服器端資料,那麼將會存在以下幾個問題:

  • 直播線上人數多就意味著訊息資料更新頻率高,拉取訊息意味著彈幕無法滿足時效性
  • 如果很多客戶端同時拉取,那麼伺服器端的壓力無異於DDOS
  • 一個彈幕系統應該是通用的,因此對於直播間彈幕較少的場景,意味著訊息資料拉取請求都是無效的

因此我們考慮推送模式:當資料發生更新的時候伺服器端主動推送到客戶端,這樣可以有效減少客戶端的請求次數。如果需要實現訊息推送,那麼就意味著伺服器端維護大量的長連線。

(3)為什麼使用WebSocket?

實現彈幕訊息的實時更新一定是使用Socket的方式,那麼為啥要使用WebSocket呢?現在大部分直播應用的開發都是跨平臺的,然而跨平臺的開發框架本質就是Web開發,那麼一定離不開WebSocket,而且一部分使用者會選擇在Web端看視訊,比如Bilibili,現如今也有一些桌面應用是用Electron等跨平臺框架開發的,比如Lark飛書等,因此實現訊息推送的最佳方案就是使用WebSocket。

使用WebSocket可以輕鬆的維持伺服器端長連線,其次WebSocket是架構在HTTP協議之上的,並且也可以使用HTTPS方式,因此WebSocket是可靠傳輸,並且不需要開發者關注底層細節。

使用Go基於WebSocket構建千萬級視訊直播彈幕系統的程式碼詳解

為啥要使用Go搞WebSocket呢?首先說到WebSocket你可能會想到Node.js,但是Node.js是單執行緒模型,如果實現高併發,不得不建立多個Node.js程序,但是這又不容易服務端遍歷整個連線集合;如果使用Java就會顯得比較笨重,Java專案的部署,編寫Dockerfile都不如Go的目標二進位制更加簡潔,並且Go協程很容易實現高併發,上一章說到Go語言目前也有成熟的WebSocket輪子。

(4)服務端基本Demo

首先搭建好一個框架:

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 註冊一個用於WebSocket的路由,實際業務中不可能只有一個路由
  http.HandleFunc("/messages",messageHandler)
  // 監聽8080埠,沒有實現服務異常處理器,因此第二個引數是nil
  http.ListenAndServe("localhost:8080",nil)
}

func messageHandler(response http.ResponseWriter,request *http.Request) {
  // TODO: 實現訊息處理
  response.Write([]byte("HelloWorld"))
}

然後完善messageHandler函式:

func messageHandler(response http.ResponseWriter,request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允許跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },}

  // 建立連線
  conn,err := upgrader.Upgrade(response,request,nil)
  if err != nil {
    return
  }

  // 收發訊息
  for {
    // 讀取訊息
    _,bytes,err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 寫入訊息
    err = conn.WriteMessage(websocket.TextMessage,bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}

現在基本上實現了WebSocket功能,但是websocket的原生API不是執行緒安全的(Close方法是執行緒安全的,並且是可重入的),並且其他模組無法複用業務邏輯,因此進行封裝:

  • 封裝Connection物件描述一個WebSocket連線
  • 為Connection物件提供執行緒安全的關閉、接收、傳送API
// main.go
package main

import (
  "bluemiaomiao.github.io/websocket-go/service"
  "fmt"
  "net/http"

  "github.com/gorilla/websocket"
)

func main() {
  fmt.Println("Listen localhost:8080")
  http.HandleFunc("/messages",messageHandler)
  _ = http.ListenAndServe("localhost:8080",nil)
  wsConn,err := service.Create(conn)
  if err != nil {
    return
  }

  // 收發訊息
  for {
    // 讀取訊息
    msg,err := wsConn.ReadOne()
    if err != nil {
      wsConn.Close()
    }
    // 寫入訊息
    err = wsConn.WriteOne(msg)
    if err != nil {
      _ = conn.Close()
    }
  }
}
// service/messsage_service.go
package service

import (
  "errors"
  "github.com/gorilla/websocket"
  "sync"
)

// 封裝的連線物件
// 
// 由於websocket的Close()方法是可重入的,所以可以多次呼叫,但是關閉Channel的close()
// 方法不是可重入的,因此通過isClosed進行判斷
// isClosed可能發生資源競爭,因此通過互斥鎖避免
// 關閉websocket連線後,也要自動關閉輸入輸出訊息流,因此通過signalCloseLoopChan實現
type Connection struct {
  conn                  *websocket.Conn  // 具體的連線物件
  inputStream             chan []byte       // 輸入流,使用Channel模擬
  outputStream           chan []byte       // 輸出流,使用chaneel模擬
  signalCloseLoopChan     chan byte       // 關閉訊號
  isClosed               bool            // 是否呼叫過close()方法
  lock                   sync.Mutex      // 簡單的鎖
}

// 用於初始化一個連線物件
func Create(conn *websocket.Conn) (connection *Connection,err error) {
  connection = &Connection{
    conn:              conn,inputStream:        make(chan []byte,1000),outputStream:       make(chan []byte,signalCloseLoopChan: make(chan byte,1),isClosed:            false,}

  // 啟動讀寫迴圈
  go connection.readLoop()
  go connection.writeLoop()
  return
}

// 讀取一條訊息
func (c *Connection) ReadOne() (msg []byte,err error) {
  select {
  case msg = <-(*c).inputStream:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 寫入一條訊息
func (c *Connection) WriteOne(msg []byte) (err error) {
  select {
  case (*c).outputStream <- msg:
  case <-(*c).signalCloseLoopChan:
    err = errors.New("connection is closed")
  }
  return
}

// 關閉連線物件
func (c *Connection) Close() {
  _ = (*c).conn.Close()
  (*c).lock.Lock()
  if !(*c).isClosed {
    close((*c).signalCloseLoopChan)
  }
  (*c).lock.Unlock()

}

// 讀取迴圈
func (c *Connection) readLoop() {
  // 不停的讀取長連線中的訊息,只要存在訊息就將其放到佇列中
  for {
    _,err := (*c).conn.ReadMessage()
    if err != nil {
      (*c).Close()
    }
    select {
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    case (*c).inputStream <- bytes:
    }
  }
}

// 寫入迴圈
func (c *Connection) writeLoop() {
  // 只要佇列中存在訊息,就將其寫入
  var data []byte
  for {
    select {
    case data = <-(*c).outputStream:
    case <-(*c).signalCloseLoopChan:
      (*c).Close()
    }
    err := (*c).conn.WriteMessage(websocket.TextMessage,data)
    if err != nil {
      _ = (*c).conn.Close()
    }
  }
}

至此,你已經學會了如何使用Go構建WebSocket服務。

到此這篇關於使用Go基於WebSocket構建千萬級視訊直播彈幕系統的程式碼詳解的文章就介紹到這了,更多相關go WebSocket視訊直播彈幕內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!