1. 程式人生 > 其它 >endless 如何實現不停機重啟 Go 程式?

endless 如何實現不停機重啟 Go 程式?

轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/584

前幾篇文章講解了如何實現一個高效的 HTTP 服務,這次我們來看一下如何實現一個永不不停機的 Go 程式。

前提

事情是這樣的,在一天風和日麗的週末,我正在看 TiDB 原始碼的時候,有一位胖友找到我說,Go 是不是每次修改都需要重啟才行?由於我才疏學淺不知道有不停機重啟這個東西,所以回答是的。然後他說,那完全沒有 PHP 好用啊,PHP 修改邏輯完之後直接替換一個檔案就可以實現釋出,不需要重啟。我當時只能和他說可以多 Pod 部署,金絲雀釋出等等也可以做到整個服務不停機發布。但是他最後還是帶著得以意笑容離去。

當時看著他離去的身影我就發誓,我要研究一下 Go 語言的不停機重啟,證明不是 Go 不行,而是我不行 [DOGE] [DOGE] [DOGE],所以就有了這麼一篇文章。

那麼對於一個不停機重啟 Go 程式我們需要解決以下兩個問題:

  1. 程序重啟不需要關閉監聽的埠;
  2. 既有請求應當完全處理或者超時;

後面我們會看一下 endless 是如何做到這兩點的。

基本概念

下面先簡單介紹一下兩個知識點,以便後面的開展

訊號處理

Go 訊號通知通過在 Channel 上傳送 os.Signal 值來工作。如我們如果使用 Ctrl+C,那麼會觸發 SIGINT 訊號,作業系統會中斷該程序的正常流程,並進入相應的訊號處理函式執行操作,完成後再回到中斷的地方繼續執行。

func main() {
    sigs := make(chan os.Signal, 1)
    done := make(chan bool, 1)
    // 監聽訊號
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) 
    go func() {
        // 接收到訊號返回
        sig := <-sigs
        fmt.Println()
        fmt.Println(sig)
        done <- true
    }() 
    fmt.Println("awaiting signal")
    // 等待訊號的接收
    <-done
    fmt.Println("exiting")
}

通過上述簡單的幾行程式碼,我們就可以監聽 SIGINT 和 SIGTERM 訊號。當 Go 接收到作業系統傳送過來的訊號,那麼會將訊號值放入到 sigs 管道中進行處理。

Fork 子程序

在Go語言中 exec 包為我們很好的封裝好了 Fork 呼叫,並且使用它可以使用 ExtraFiles 很好的繼承父程序已開啟的檔案。

file := netListener.File() // this returns a Dup()
path := "/path/to/executable"
args := []string{
    "-graceful"}
// 產生 Cmd 例項
cmd := exec.Command(path, args...)
// 標準輸出
cmd.Stdout = os.Stdout
// 標準錯誤輸出
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{file}
// 啟動命令
err := cmd.Start()
if err != nil {
    log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)
}

通過呼叫 exec 包的 Command 命令傳入 path(將要執行的命令路徑)、args (命令的引數)即可返回 Cmd 例項,通過 ExtraFiles 欄位指定額外被新程序繼承的已開啟檔案,最後呼叫 Start 方法建立子程序。

這裡的 netListener.File會通過系統呼叫 dup 複製一份 file descriptor 檔案描述符。

func Dup(oldfd int) (fd int, err error) {
	r0, _, e1 := Syscall(SYS_DUP, uintptr(oldfd), 0, 0)
	fd = int(r0)
	if e1 != 0 {
		err = errnoErr(e1)
	}
	return
}

我們可以看到 dup 的命令介紹:

dup and dup2 create a copy of the file descriptor oldfd.
After successful return of dup or dup2, the old and new descriptors may
be used interchangeably. They share locks, file position pointers and
flags; for example, if the file position is modified by using lseek on
one of the descriptors, the position is also changed for the other.

The two descriptors do not share the close-on-exec flag, however.

通過上面的描述可以知道,返回的新檔案描述符和引數 oldfd 指向同一個檔案,共享所有的索性、讀寫指標、各項許可權或標誌位等。但是不共享關閉標誌位,也就是說 oldfd 已經關閉了,也不影響寫入新的資料到 newfd 中。

上圖顯示了fork一個子程序,子程序複製父程序的檔案描述符表。

endless 不停機重啟示例

我這裡稍微寫一下 endless 的使用示例給沒有用過 endless 的同學看看,熟悉 endless 使用的同學可以跳過。

import (
	"log"
	"net/http"
	"os"
	"sync"
	"time"

	"github.com/fvbock/endless"
	"github.com/gorilla/mux"
)

func handler(w http.ResponseWriter, r *http.Request) {
	duration, err := time.ParseDuration(r.FormValue("duration"))
	if err != nil {
		http.Error(w, err.Error(), 400)
		return
	}
	time.Sleep(duration)
	w.Write([]byte("Hello World"))
}

func main() {
	mux1 := mux.NewRouter()
	mux1.HandleFunc("/sleep", handler)

	w := sync.WaitGroup{}
	w.Add(1)
	go func() {
		err := endless.ListenAndServe("127.0.0.1:5003", mux1)
		if err != nil {
			log.Println(err)
		}
		log.Println("Server on 5003 stopped")
		w.Done()
	}()
	w.Wait()
	log.Println("All servers stopped. Exiting.")

	os.Exit(0)
}

下面驗證一下 endless 建立的不停機服務:

# 第一次構建專案
go build main.go
# 執行專案,這時就可以做內容修改了
./endless &
# 請求專案,60s後返回
curl "http://127.0.0.1:5003/sleep?duration=60s" &
# 再次構建專案,這裡是新內容
go build main.go
# 重啟,17171為pid
kill -1 17171
# 新API請求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

執行完上面的命令我們可以看到,對於第一個請求返回的是:Hello world,在傳送第二個請求之前,我將 handler 裡面的返回值改成了:Hello world2222,然後進行構建重啟。

由於我設定了 60s 才返回第一個請求,第二個請求設定的是 1s 返回,所以這裡會先返回第二個請求的值,然後再返回第一個請求的值。

整個時間線如下所示:

並且在等待第一個請求返回期間,可以看到同時有兩個程序在跑:

$ ps -ef |grep main
root      84636  80539  0 22:25 pts/2    00:00:00 ./main
root      85423  84636  0 22:26 pts/2    00:00:00 ./main

在第一個請求響應之後,我們再看程序可以發現父程序已經關掉了,實現了父子程序無縫切換:

$ ps -ef |grep main
root      85423      1  0 22:26 pts/2    00:00:00 ./main

實現原理

在實現上,我這裡用的是 endless 的實現方案,所以下面原理和程式碼都通過它的程式碼進行講解。

我們要做的不停機重啟,實現原理如上圖所示:

  1. 監聽 SIGHUP 訊號;
  2. 收到訊號時 fork 子程序(使用相同的啟動命令),將服務監聽的 socket 檔案描述符傳遞給子程序;
  3. 子程序監聽父程序的 socket,這個時候父程序和子程序都可以接收請求;
  4. 子程序啟動成功之後傳送 SIGTERM 訊號給父程序,父程序停止接收新的連線,等待舊連線處理完成(或超時);
  5. 父程序退出,升級完成;

程式碼實現

我們從上面的示例可以看出,endless 的入口是 ListenAndServe 函式:

 func ListenAndServe(addr string, handler http.Handler) error {
    // 初始化 server
	server := NewServer(addr, handler)
    // 監聽以及處理請求
	return server.ListenAndServe()
}

這個方法分為兩部分,先是初始化 server,然後再監聽以及處理請求。

初始化 Server

我們首先看一下一個 endless 服務的 Server 結構體是怎樣:

type endlessServer struct {
	// 用於繼承 http.Server 結構
	http.Server
	// 監聽客戶端請求的 Listener
	EndlessListener  net.Listener  
	// 用於記錄還有多少客戶端請求沒有完成
	wg               sync.WaitGroup
	// 用於接收訊號的管道
	sigChan          chan os.Signal
	// 用於重啟時標誌本程序是否是為一個新程序
	isChild          bool
	// 當前程序的狀態
	state            uint8 
    ...
}

這個 endlessServer 除了繼承 http.Server 所有欄位以外,因為還需要監聽訊號以及判斷是不是一個新的程序,所以添加了幾個狀態位的欄位:

  • wg:標記還有多少客戶端請求沒有完成;
  • sigChan:用於接收訊號的管道;
  • isChild:用於重啟時標誌本程序是否是為一個新程序;
  • state:當前程序的狀態。

下面我們看看如何初始化 endlessServer :

func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()
 	
    socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
	// 根據環境變數判斷是不是子程序
	isChild = os.Getenv("ENDLESS_CONTINUE") != "" 
    // 由於支援多 server,所以這裡需要設定一下 server 的順序
    if len(socketOrder) > 0 {
		for i, addr := range strings.Split(socketOrder, ",") {
			socketPtrOffsetMap[addr] = uint(i)
		}
	} else {
		socketPtrOffsetMap[addr] = uint(len(runningServersOrder))
	}

	srv = &endlessServer{
		wg:      sync.WaitGroup{},
		sigChan: make(chan os.Signal),
		isChild: isChild,
		...
		state: STATE_INIT,
		lock:  &sync.RWMutex{},
	}

	srv.Server.Addr = addr
	srv.Server.ReadTimeout = DefaultReadTimeOut
	srv.Server.WriteTimeout = DefaultWriteTimeOut
	srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes
	srv.Server.Handler = handler
 
	runningServers[addr] = srv
	...
	return
}

這裡初始化都是我們在 net/http 裡面看到的一些常見的引數,包括 ReadTimeout 讀取超時時間、WriteTimeout 寫入超時時間、Handler 請求處理器等,不熟悉的可以看一下這篇:《 一文說透 Go 語言 HTTP 標準庫 https://www.luozhiyun.com/archives/561 》。

需要注意的是,這裡是通過 ENDLESS_CONTINUE 環境變數來判斷是否是個子程序,這個環境變數會在 fork 子程序的時候寫入。因為 endless 是支援多 server 的,所以需要用 ENDLESS_SOCKET_ORDER變數來判斷一下 server 的順序。

ListenAndServe

func (srv *endlessServer) ListenAndServe() (err error) {
	addr := srv.Addr
	if addr == "" {
		addr = ":http"
	}
	// 非同步處理訊號量
	go srv.handleSignals()
	// 獲取埠監聽
	l, err := srv.getListener(addr)
	if err != nil {
		log.Println(err)
		return
	}
	// 將監聽轉為 endlessListener
	srv.EndlessListener = newEndlessListener(l, srv)

	// 如果是子程序,那麼傳送 SIGTERM 訊號給父程序
	if srv.isChild {
		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)
	}

	srv.BeforeBegin(srv.Addr)
	// 響應Listener監聽,執行對應請求邏輯
	return srv.Serve()
}

這個方法其實和 net/http 庫是比較像的,首先獲取埠監聽,然後呼叫 Serve 處理請求傳送過來的資料,大家可以開啟文章《 一文說透 Go 語言 HTTP 標準庫 https://www.luozhiyun.com/archives/561 》對比一下和 endless 的異同。

但是還是有幾點不一樣的,endless 為了做到平滑重啟需要用到訊號監聽處理,並且在 getListener 的時候也不一樣,如果是子程序需要繼承到父程序的 listen fd,這樣才能做到不關閉監聽的埠。

handleSignals 訊號處理

訊號處理主要是訊號的一個監聽,然後根據不同的訊號迴圈處理。

func (srv *endlessServer) handleSignals() {
	var sig os.Signal
	// 註冊訊號監聽
	signal.Notify(
		srv.sigChan,
		hookableSignals...,
	)
	// 獲取pid
	pid := syscall.Getpid()
	for {
		sig = <-srv.sigChan
		// 在處理訊號之前觸發hook
		srv.signalHooks(PRE_SIGNAL, sig)
		switch sig {
		// 接收到平滑重啟訊號
		case syscall.SIGHUP:
			log.Println(pid, "Received SIGHUP. forking.")
			err := srv.fork()
			if err != nil {
				log.Println("Fork err:", err)
			} 
		// 停機訊號
		case syscall.SIGINT:
			log.Println(pid, "Received SIGINT.")
			srv.shutdown()
		// 停機訊號
		case syscall.SIGTERM:
			log.Println(pid, "Received SIGTERM.")
			srv.shutdown()
		...
		// 在處理訊號之後觸發hook
		srv.signalHooks(POST_SIGNAL, sig)
	}
}

這一部分的程式碼十分簡潔,當我們用kill -1 $pid 的時候這裡 srv.sigChan 就會接收到相應的訊號,並進入到 case syscall.SIGHUP 這塊邏輯程式碼中。

需要注意的是,在上面的 ListenAndServe 方法中子程序會像父程序傳送 syscall.SIGTERM 訊號也會在這裡被處理,執行的是 shutdown 停機邏輯。

在進入到 case syscall.SIGHUP 這塊邏輯程式碼之後會呼叫 fork 函式,下面我們再來看看 fork 邏輯:

func (srv *endlessServer) fork() (err error) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()

	// 校驗是否已經fork過
	if runningServersForked {
		return errors.New("Another process already forked. Ignoring this one.")
	} 
	runningServersForked = true

	var files = make([]*os.File, len(runningServers))
	var orderArgs = make([]string, len(runningServers))
	// 因為有多 server 的情況,所以獲取所有 listen fd
	for _, srvPtr := range runningServers { 
		switch srvPtr.EndlessListener.(type) {
		case *endlessListener: 
			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
		default: 
			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
		}
		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
	}
	// 環境變數
	env := append(
		os.Environ(),
    // 啟動endless 的時候,會根據這個引數來判斷是否是子程序
		"ENDLESS_CONTINUE=1",
	)
	if len(runningServers) > 1 {
		env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
	}

	// 程式執行路徑
	path := os.Args[0]
	var args []string
	// 引數
	if len(os.Args) > 1 {
		args = os.Args[1:]
	}

	cmd := exec.Command(path, args...)
	// 標準輸出
	cmd.Stdout = os.Stdout
	// 錯誤
	cmd.Stderr = os.Stderr
	cmd.ExtraFiles = files
	cmd.Env = env  
	err = cmd.Start()
	if err != nil {
		log.Fatalf("Restart: Failed to launch, error: %v", err)
	} 
	return
}

fork 這塊程式碼首先會根據 server 來獲取不同的 listen fd 然後封裝到 files 列表中,然後在呼叫 cmd 的時候將檔案描述符傳入到 ExtraFiles 引數中,這樣子程序就可以無縫託管到父程序監聽的埠。

需要注意的是,env 引數列表中有一個 ENDLESS_CONTINUE 引數,這個引數會在 endless 啟動的時候做校驗:

func NewServer(addr string, handler http.Handler) (srv *endlessServer) {
	runningServerReg.Lock()
	defer runningServerReg.Unlock()

	socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")
	isChild = os.Getenv("ENDLESS_CONTINUE") != ""
  ...
}

下面我們再看看 接收到 SIGTERM 訊號後,shutdown 會怎麼做:

func (srv *endlessServer) shutdown() {
	if srv.getState() != STATE_RUNNING {
		return
	}

	srv.setState(STATE_SHUTTING_DOWN)
    // 預設 DefaultHammerTime 為 60秒
	if DefaultHammerTime >= 0 {
		go srv.hammerTime(DefaultHammerTime)
	}
	// 關閉存活的連線
	srv.SetKeepAlivesEnabled(false)
	err := srv.EndlessListener.Close()
	if err != nil {
		log.Println(syscall.Getpid(), "Listener.Close() error:", err)
	} else {
		log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")
	}
}

shutdown 這裡會先將連線關閉,因為這個時候子程序已經啟動了,所以不再處理請求,需要把埠的監聽關了。這裡還會非同步呼叫 srv.hammerTime 方法等待60秒把父程序的請求處理完畢才關閉父程序。

getListener 獲取埠監聽

func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
	// 如果是子程序
	if srv.isChild {
		var ptrOffset uint = 0
		runningServerReg.RLock()
		defer runningServerReg.RUnlock()
		// 這裡還是處理多個 server 的情況
		if len(socketPtrOffsetMap) > 0 {
			// 根據server 的順序來獲取 listen fd 的序號
			ptrOffset = socketPtrOffsetMap[laddr] 
		}
		// fd 0,1,2是預留給 標準輸入、輸出和錯誤的,所以從3開始
		f := os.NewFile(uintptr(3+ptrOffset), "")
		l, err = net.FileListener(f)
		if err != nil {
			err = fmt.Errorf("net.FileListener error: %v", err)
			return
		}
	} else {
		// 父程序 直接返回 listener
		l, err = net.Listen("tcp", laddr)
		if err != nil {
			err = fmt.Errorf("net.Listen error: %v", err)
			return
		}
	}
	return
}

這裡如果是父程序沒什麼好說的,直接建立一個埠監聽並返回就好了。

但是對於子程序來說是有一些繞,首先說一下 os.NewFile 的引數為什麼要從3開始。因為子程序在繼承父程序的 fd 的時候0,1,2是預留給 標準輸入、輸出和錯誤的,所以父程序給的第一個fd在子程序裡順序排就是從3開始了,又因為 fork 的時候cmd.ExtraFiles 引數傳入的是一個 files,如果有多個 server 那麼會依次從3開始遞增。

如下圖,前三個 fd 是預留給 標準輸入、輸出和錯誤的,fd 3 是根據傳入 ExtraFiles 的陣列列表依次遞增的。

其實這裡我們也可以用開頭的例子做一下試驗:

# 第一次構建專案
go build main.go
# 執行專案,這時就可以做內容修改了
./endless &
# 這個時候我們看看父程序開啟的檔案
lsof  -P -p 17116
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    18942 root    0u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    18942 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 請求專案,60s後返回
curl "http://127.0.0.1:5003/sleep?duration=60s" & 
# 重啟,17116為父程序pid
kill -1 17116
# 然後我們看一下 main 程式的程序應該有兩個
ps -ef |grep ./main
root      17116  80539  0 04:19 pts/2    00:00:00 ./main
root      18110  17116  0 04:21 pts/2    00:00:00 ./main
# 可以看到子程序pid 為18110,我們看看該程序開啟的檔案
lsof  -P -p 18110
COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME
...
main    19073 root    0r      CHR     1,3      0t0     1028 /dev/null
main    19073 root    1u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    2u      CHR   136,2      0t0        5 /dev/pts/2
main    19073 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
main    19073 root    4u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)
# 新API請求
curl "http://127.0.0.1:5003/sleep?duration=1s" 

總結

通過上面的介紹,我們通過 endless 學習了在 Go 服務中如何做到不停機也可以重啟服務,相信這個功能在很多場景下都會用到,沒用到的同學也可以嘗試在自己的系統上玩一下。

熱重啟總的來說它允許服務重啟期間,不中斷已經建立的連線,老服務程序不再接受新連線請求,新連線請求將在新服務程序中受理。對於原服務程序中已經建立的連線,也可以將其設為讀關閉,等待平滑處理完連線上的請求及連線空閒後再行退出。

通過這種方式,可以保證已建立的連線不中斷,新的服務程序也可以正常接受連線請求。

Reference

https://goteleport.com/blog/golang-ssh-bastion-graceful-restarts/

https://grisha.org/blog/2014/06/03/graceful-restart-in-golang/

https://stackoverflow.com/questions/28370646/how-do-i-fork-a-go-process/28371586#28371586

https://xixiliguo.github.io/post/golang-exec/

https://github.com/fvbock/endless

https://golang.org/pkg/os/signal/

https://stackoverflow.com/questions/11635219/dup2-dup-why-would-i-need-to-duplicate-a-file-descriptor

http://www.hitzhangjie.pro/blog/2020-08-28-go程式如何實現熱重啟/