fastHttp服務端處理請求的過程
阿新 • • 發佈:2021-02-01
1. Github 地址 https://github.com/valyala/fasthttp
2. fastHttp 服務端的處理請求的過程
1. 工作過程
![fastHttp服務端工作過程](https://img2020.cnblogs.com/blog/947592/202102/947592-20210201193454874-939136752.jpg)
2. 主要程式碼
1. 設定監聽地址 server.go
```go
func (s *Server) ListenAndServe(addr string) error {
ln, err := net.Listen("tcp4", addr)
if err != nil {
return err
}
if tcpln, ok := ln.(*net.TCPListener); ok {
return s.Serve(tcpKeepaliveListener{
TCPListener: tcpln,
keepalive: s.TCPKeepalive,
keepalivePeriod: s.TCPKeepalivePeriod,
})
}
return s.Serve(ln)
}
```
2. 初始化協程池並啟動,當接收到請求後,調 wp.Serve 交給協程池去處理請求 server.go
```go
func (s *Server) Serve(ln net.Listener) error {
.....
wp := &workerPool{
WorkerFunc: s.serveConn,
MaxWorkersCount: maxWorkersCount,
LogAllErrors: s.LogAllErrors,
Logger: s.logger(),
connState: s.setState,
}
wp.Start()
for {
if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
wp.Stop()
return err
}
s.setState(c, StateNew)
atomic.AddInt32(&s.open, 1)
if !wp.Serve(c) {
atomic.AddInt32(&s.open, -1)
s.writeFastError(c, StatusServiceUnavailable,
"The connection cannot be served because Server.Concurrency limit exceeded")
c.Close()
s.setState(c, StateClosed)
}
}
......
}
```
3. 獲取請求的控制代碼 server.go
```go
func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
for {
c, err := ln.Accept()
......
return c, nil
}
}
```
4. 協程池處理請求 workerPool.go
```go
// 從協程池中獲取某個協程對應的 控制代碼channel,然後將 3 中獲取到的 請求控制代碼推入channel
func (wp *workerPool) Serve(c net.Conn) bool {
ch := wp.getCh()
if ch == nil {
return false
}
ch.ch <- c
return true
}
// 這個是協程池工作最重要的部分,獲取協程池中協程對應的channel
func (wp *workerPool) getCh() *workerChan {
var ch *workerChan
createWorker := false
wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
// 獲取協程控制代碼channel失敗,如果可以新建協程的話就進行新建,如果不可用新建的話,返回的控制代碼channel為nil,本次請求被拒絕服務
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
// 獲取協程控制代碼channel 成功
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()
if ch == nil {
if !createWorker {
return nil
}
// 新建協程控制代碼,且為之建立協程
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return ch
}
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
var err error
for c = range ch.ch {
//協程的控制代碼channel 出現nil的時候 當前協程就退出了
if c == nil {
break
}
// wp.WorkerFunc是在初始化協程池的時候註冊的方法,是 s.serveConn
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
......
}
}
......
if !wp.release(ch) {
break
}
}
.....
// 協程的控制代碼channel 出現nil的時候 當前協程就退出了,退出後將work協程數自減
wp.workersCount--
......
}
// 請求處理完之後 將當前協程的控制代碼channel 放入協程池的ready切片中,待下次獲取協程的控制代碼channel的時候進行復用(複用這個控制代碼channel就相當於複用了對應的協程)
func (wp *workerPool) release(ch *workerChan) bool {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}
```
5. 實際處理請求的方法 server.go
```go
func (s *Server) serveConn(c net.Conn) (err error) {
.....
//上下文物件複用並初始化上下文
ctx := s.acquireCtx(c)
......
//執行在初始化server的時候自定義的 邏輯
if continueReadingRequest {
s.Handler(ctx)
}
.....
// 處理http響應
}
```
3. 高效的原因
1. fastHttp 協程池 詳情見 2.2.2 、2.2.4
2. fastHttp 物件複用
1. 協程channle物件 複用 詳見2.2.4 的 getCh() 函式
2. 上下文物件複用 詳見2.2.5
注:
本文為自己對fastHttp的理解,如有疑問,歡迎一起討論交流
如需轉載請註明出處:https://www.cnblogs.com/zhuchenglin/p/14358