1. 程式人生 > 其它 >fasthttp:比net/http快十倍的Go框架(server 篇)

fasthttp:比net/http快十倍的Go框架(server 篇)

轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/574

我們在上一篇文章中講解了 Go HTTP 標準庫的實現原理,這一次我找到了一個號稱比net/http快十倍的Go框架 fasthttp,這次我們再來看看它有哪些優秀的設計值得我們去挖掘。

一個典型的 HTTP 服務應該如圖所示:

基於HTTP構建的服務標準模型包括兩個端,客戶端(Client)和服務端(Server)。HTTP 請求從客戶端發出,服務端接受到請求後進行處理然後將響應返回給客戶端。所以http伺服器的工作就在於如何接受來自客戶端的請求,並向客戶端返回響應。

這篇我們來講講 Server 端的實現。

實現原理

net/http 與 fasthttp 實現對比

我們在講 net/http 的時候講過,它的處理流程大概是這樣的:

  1. 註冊處理器到一個 hash 表中,可以通過鍵值路由匹配;
  2. 註冊完之後就是開啟迴圈監聽,每監聽到一個連線就會建立一個 Goroutine;
  3. 在建立好的 Goroutine 裡面會迴圈的等待接收請求資料,然後根據請求的地址去處理器路由表中匹配對應的處理器,然後將請求交給處理器處理;

這樣做在連線數比較少的時候是沒什麼問題的,但是在連線數非常多的時候,每個連線都會建立一個 Goroutine 就會給系統帶來一定的壓力。這也就造成了 net/http

在處理高併發時的瓶頸。

下面我們再看看 fasthttp 是如何做的:

  1. 啟動監聽;
  2. 迴圈監聽埠獲取連線;
  3. 獲取到連線之後首先會去 ready 佇列裡獲取 workerChan,獲取不到就會去物件池獲取;
  4. 將監聽的連線傳入到 workerChan 的 channel 中;
  5. workerChan 有一個 Goroutine 一直迴圈獲取 channel 中的資料,獲取到之後就會對請求進行處理然後返回。

上面有提到 workerChan 其實就是一個連線處理物件,這個物件裡面有一個 channel 用來傳遞連線;每個 workerChan 在後臺都會有一個 Goroutine 迴圈獲取 channel 中的連線,然後進行處理。如果沒有設定最大同時連線處理數的話,預設是 256 * 1024

個。這樣可以在併發很高的時候還可以同時保證對外提供服務。

除此之外,在實現上還通過 sync.Pool 來大量的複用物件,減少記憶體分配,如:

workerChanPool 、ctxPool 、readerPool、writerPool 等等多大30多個 sync.Pool 。

除了複用物件,fasthttp 還會切片,通過 s = s[:0]s = append(s[:0], b…)來減少切片的再次建立。

fasthttp 由於需要和 string 打交道的地方很多,所以還從很多地方儘量的避免[]byte到string轉換時帶來的記憶體分配和拷貝帶來的消耗 。

小結

綜上我們大致介紹了一下 fasthttp 提升效能的點:

  1. 控制非同步 Goroutine 的同時處理數量,最大預設是 256 * 1024個;
  2. 使用 sync.Pool 來大量的複用物件和切片,減少記憶體分配;
  3. 儘量的避免[]byte到string轉換時帶來的記憶體分配和拷貝帶來的消耗 ;

原始碼解析

我們以一個簡單的例子作為開始:

func main() { 
	if err := fasthttp.ListenAndServe(":8088", requestHandler); err != nil {
		log.Fatalf("Error in ListenAndServe: %s", err)
	}
}

func requestHandler(ctx *fasthttp.RequestCtx) {
	fmt.Fprintf(ctx, "Hello, world!\n\n")
}

我們呼叫 ListenAndServe 函式會啟動服務監聽,等待任務進行處理。ListenAndServe 函式實際上會呼叫到 Server 的 ListenAndServe 方法,這裡我們看一下 Server 結構體的欄位:

上圖簡單的列舉了一些 Server 結構體的常見欄位,包括:請求處理器、服務名、請求讀取超時時間、請求寫入超時時間、每個連線最大請求數等。除此之外還有很多其他引數,可以在各個維度上控制服務端的一些引數。

Server 的 ListenAndServe 方法會獲取 TCP 監聽,然後呼叫 Serve 方法執行服務端的邏輯處理。

Server 方法主要做了以下幾件事:

  1. 初始化並啟動 worker Pool;
  2. 接收請求 Connection;
  3. 將 Connection 交給 worker Pool 處理;
func (s *Server) Serve(ln net.Listener) error {
	...
	s.mu.Unlock()
	// 初始化 worker Pool
	wp := &workerPool{
		WorkerFunc:      s.serveConn,
		MaxWorkersCount: maxWorkersCount,
		LogAllErrors:    s.LogAllErrors,
		Logger:          s.logger(),
		connState:       s.setState,
	}
	// 啟動 worker Pool
	wp.Start() 
	// 迴圈處理 connection
	for {
		// 獲取 connection
		if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
			wp.Stop()
			if err == io.EOF {
				return nil
			}
			return err
		}
		s.setState(c, StateNew)
		atomic.AddInt32(&s.open, 1)
		// 處理 connection
		if !wp.Serve(c) {
			// 進入if 說明已到併發極限
			...
		}
		c = nil
	}
}

worker Pool

worker Pool 是用來處理所有請求 Connection 的,這裡稍微看看 workerPool 結構體的欄位:

  • WorkerFunc: 用來匹配請求對應的 handler 並執行;
  • MaxWorkersCount:最大同時處理的請求數;
  • ready:空閒的 workerChan;
  • workerChanPool:workerChan 的物件池,是一個 sync.Pool 型別的;
  • workersCount:目前正在處理的請求數;

下面我們看一下 workerPool 的 Start 方法:

func (wp *workerPool) Start() {
	if wp.stopCh != nil {
		panic("BUG: workerPool already started")
	}
	wp.stopCh = make(chan struct{})
	stopCh := wp.stopCh
    // 設定 worker Pool 的建立函式
	wp.workerChanPool.New = func() interface{} {
		return &workerChan{
			ch: make(chan net.Conn, workerChanCap),
		}
	}
	go func() {
		var scratch []*workerChan
		for {
            // 沒隔一段時間會清理空閒超時的 workerChan
			wp.clean(&scratch)
			select {
			case <-stopCh:
				return
			default:
                // 預設是 10 s
				time.Sleep(wp.getMaxIdleWorkerDuration())
			}
		}
	}()
}

Start 方法裡面主要是:

  1. 設定 workerChanPool 的建立函式;
  2. 啟動一個 Goroutine 定時清理 workerPool 中的 ready 中儲存的空閒 workerChan,預設每 10s 啟動一次。

獲取連線

func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
	for {
		c, err := ln.Accept()
		if err != nil {
			if c != nil {
				panic("BUG: net.Listener returned non-nil conn and non-nil error")
			}
			...
			return nil, io.EOF
		}
		if c == nil {
			panic("BUG: net.Listener returned (nil, nil)")
		}
        // 校驗每個ip對應的連線數
		if s.MaxConnsPerIP > 0 {
			pic := wrapPerIPConn(s, c)
			if pic == nil {
				if time.Since(*lastPerIPErrorTime) > time.Minute {
					s.logger().Printf("The number of connections from %s exceeds MaxConnsPerIP=%d",
						getConnIP4(c), s.MaxConnsPerIP)
					*lastPerIPErrorTime = time.Now()
				}
				continue
			}
			c = pic
		}
		return c, nil
	}
}

獲取連線其實沒什麼好說的,和 net/http 庫一樣呼叫的 TCPListener 的 accept 方法獲取 TCP Connection。

處理連線

處理連線部分首先會獲取 workerChan ,workerChan 結構體裡面包含了兩個欄位:lastUseTime、channel:

type workerChan struct {
	lastUseTime time.Time
	ch          chan net.Conn
}
  • lastUseTime 標識最後一次被使用的時間;

  • ch 是用來傳遞 Connection 用的。

獲取到 Connection 之後會傳入到 workerChan 的 channel 中,每個對應的 workerChan 都有一個非同步 Goroutine 在處理 channel 裡面的 Connection。

獲取 workerChan

func (wp *workerPool) Serve(c net.Conn) bool {
    // 獲取 workerChan 
	ch := wp.getCh()
	if ch == nil {
		return false
	}
    // 將 Connection 放入到 channel 中
	ch.ch <- c
	return true
}

Serve 方法主要是通過 getCh 方法獲取 workerChan ,然後將當前的 Connection 傳入到 workerChan 的 channel 中。

func (wp *workerPool) getCh() *workerChan {
	var ch *workerChan
	createWorker := false

	wp.lock.Lock()
	// 嘗試從空閒佇列裡獲取 workerChan
	ready := wp.ready
	n := len(ready) - 1
	if n < 0 {
		if wp.workersCount < wp.MaxWorkersCount {
			createWorker = true
			wp.workersCount++
		}
	} else {
		ch = ready[n]
		ready[n] = nil
		wp.ready = ready[:n]
	}
	wp.lock.Unlock()
	// 獲取不到則從物件池中獲取
	if ch == nil {
		if !createWorker {
			return nil
		}
		vch := wp.workerChanPool.Get()
		ch = vch.(*workerChan)
		// 為新的 workerChan 開啟 goroutine
		go func() {
			// 處理 channel 中的資料
			wp.workerFunc(ch)
			// 處理完之後重新放回到物件池中
			wp.workerChanPool.Put(vch)
		}()
	}
	return ch
}

getCh 方法首先會去 ready 空閒佇列中獲取 workerChan,如果獲取不到則從物件池中獲取,從物件池中獲取的新的 workerChan 會啟動 Goroutine 用來處理 channel 中的資料。

處理連線

func (wp *workerPool) workerFunc(ch *workerChan) {
	var c net.Conn

	var err error
	// 消費 channel 中的資料
	for c = range ch.ch {
		if c == nil {
			break
		}
		// 讀取請求資料並響應返回
		if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
			...
		} 
		c = nil
		// 將當前的 workerChan 放入的 ready 佇列中
		if !wp.release(ch) {
			break
		}
	}

	wp.lock.Lock()
	wp.workersCount--
	wp.lock.Unlock()
}

這裡會遍歷獲取 workerChan 的 channel 中的 Connection 然後執行 WorkerFunc 函式處理請求,處理完畢之後會將當前的 workerChan 重新放入到 ready 佇列中複用。

需要注意的是,這個迴圈會在獲取 Connection 為 nil 的時候跳出迴圈,這個 nil 是 workerPool 在非同步呼叫 clean 方法檢查該 workerChan 空閒時間超長了就會往 channel 中傳入一個 nil。

這裡設定的 WorkerFunc 函式是 Server 的 serveConn 方法,裡面會獲取到請求的引數,然後根據請求呼叫到對應的 handler 進行請求處理,然後返回 response,由於 serveConn 方法比較長這裡就不解析了,感興趣的同學自己看看。

總結

我們這裡分析了 fasthttp 的實現原理,通過原理我們可以知道 fasthttp 和 net/http 在實現上面有什麼差異,從而大致得出 fasthttp 快的原因,然後再從它的實現細節知道它在實現上是如何做到減少記憶體分配從而提高效能的。