Codis原始碼解析——proxy的啟動
proxy啟動的時候,首先檢查輸入的命令列,一般情況下,啟動proxy的命令如下:
nohup ./bin/codis-proxy --ncpu=2 --config=./conf/proxy.conf --log=./logs/proxy.log --log-level=WARN &
程式會解析這行命令引數,下面舉個例子(例項程式碼是cmd/proxy/main.go),有關於go的並行,這裡要特別說明一下,如果不顯示的指明GOMAXPROCS的話,goroutine都是執行在同一個CPU核心上,一個goroutine得到時間片的時候,其他的goroutine都在等待。所以還是要根據輸入手動指定一下的。
d, err := docopt.Parse(usage, nil, true, "", false)
var ncpu int
if n, ok := utils.ArgumentInteger(d, "--ncpu"); ok {
ncpu = n
} else {
ncpu = 4
}
runtime.GOMAXPROCS(ncpu)
config制定了配置檔案的目錄,程式會讀取配置檔案中的配置項,填充到pkg/proxy/config.go中的config struct中。
當所有config屬性填充完畢之後,呼叫pkg/proxy/proxy.go中的構造方法,得到一個Proxy,首先我們看看proxy這個struct的結構
type Proxy struct {
mu sync.Mutex
xauth string
//不要搞混了,這個是/pkg/models/proxy.go下面的Proxy struct,也就是proxy成功建立之後,
//在zookeeper的/codis3/codis-wujiang/proxy/proxy-token中展示的資訊
model *models.Proxy
//接收退出資訊的channel
exit struct {
C chan struct{}
}
online bool
closed bool
config *Config
//Router中儲存了叢集中所有sharedBackendConnPool和slot,用於將redis請求轉發給相應的slot進行處理
router *Router
ignore []byte
//監聽proxy的19000埠的Listener,也就是proxy實際工作的埠
lproxy net.Listener
//監聽proxy_admin的11080埠的Listener,也就是codis叢集和proxy進行互動的埠
ladmin net.Listener
ha struct {
//上帝視角sentinel,並不是真正的物理伺服器
//s := &Sentinel{Product: product, Auth: auth}
//s.Context, s.Cancel = context.WithCancel(context.Background())
monitor *redis.Sentinel
//int為groupId,標示每個group的主伺服器
masters map[int]string
//當前叢集中的所有物理sentinel
servers []string
}
//java客戶端Jodis與codis叢集互動,就是通過下面的struct,裡面儲存了zkClient以及"/jodis/codis-wujiang/proxy-token"這個路徑
jodis *Jodis
}
當然,重點還是得到proxy的這個方法。這個方法是proxy啟動過程中最重要的一步,內容也很多,初次看會比較頭疼
func New(config *Config) (*Proxy, error) {
//config的引數校驗
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}
if err := models.ValidateProduct(config.ProductName); err != nil {
return nil, errors.Trace(err)
}
//新建一個Proxy,後面就是它填充屬性
s := &Proxy{}
s.config = config
s.exit.C = make(chan struct{})
//通過config new一個Router,這一步只是初始化了Router中的兩個sharedBackendConnPool的結構,
//也就是map[string]*sharedBackendConn
s.router = NewRouter(config)
s.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64())
s.model = &models.Proxy{
StartTime: time.Now().String(),
}
s.model.ProductName = config.ProductName
s.model.DataCenter = config.ProxyDataCenter
//獲得當前程序的程序號
s.model.Pid = os.Getpid()
//返回路徑的字串
s.model.Pwd, _ = os.Getwd()
//獲取當前作業系統資訊和主機名
if b, err := exec.Command("uname", "-a").Output(); err != nil {
log.WarnErrorf(err, "run command uname failed")
} else {
s.model.Sys = strings.TrimSpace(string(b))
}
s.model.Hostname = utils.Hostname
//將config中的引數設定到models.proxy裡,主要設定的引數是admin_addr,proxy_addr,設定之後呼叫net.Listen進行監聽,成功的話將Listener存入proxy,生成Token和xauth,以及proxy.Jodis。這裡也設定了zk最終的樹形路徑
//如果配置檔案中的jodis_compatible設定的是false,就採用codis3的zk路徑,jodis/codis-demo/proxy-token。如果相容這裡設定的true,就還採用codis2時期的/zk/codis/db_productName
if err := s.setup(config); err != nil {
s.Close()
return nil, err
}
//到這一步,其實proxy已經新建完畢,控制檯打印出新建的proxy資訊
log.Warnf("[%p] create new proxy:\n%s", s, s.model.Encode())
unsafe2.SetMaxOffheapBytes(config.ProxyMaxOffheapBytes.Int64())
//新建一個路由表,對傳送到11080埠的請求做處理
go s.serveAdmin()
//啟動goroutine來監聽傳送到19000埠的redis請求
go s.serveProxy()
s.startMetricsJson()
s.startMetricsInfluxdb()
s.startMetricsStatsd()
return s, nil
}
models.Proxy的詳細資訊會掛載在zk目錄”/codis3/codis-wujiang/proxy/proxy-token”下
{
"id": 1,
"token": "0317b8f67921f8c7a2d19d372cc9511b",
"start_time": "2017-07-28 14:44:36.462306337 +0800 CST",
"admin_addr": "*.*.*.*:11080",
"proto_type": "tcp4",
"proxy_addr": "*.*.*.*:19000",
"jodis_path": "/jodis/codis-wujiang/proxy-0317b8f67921f8c7a2d19d372cc9511b",
"product_name": "codis-wujiang",
"pid": 45092,
"pwd": "/app/codis",
"sys": "Linux cnsz22vla888.novalocal 2.6.32-504.el6.x86_64 #1 SMP Wed Oct 15 04:27:16 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux",
"hostname": "cnsz22vla888.novalocal",
"datacenter": ""
}
路由表那裡,著重介紹一下。路由表本質上其實就是一個map,key是路徑 ==> “/”,value是該路徑所對應的處理函式 ==> 在我們這裡就是newApiServer(s)出來的處理函式。codis-proxy啟動之後,這一步是啟動http服務,當叢集配置命令請求向prxoy_admin的11080埠發過來之後,轉發做相應處理。newApiServer方法使用了這裡使用了martini框架。有關於martini框架的詳細資訊,可以參見http://www.oschina.net/p/martini/
func (s *Proxy) serveAdmin() {
if s.IsClosed() {
return
}
defer s.Close()
log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr())
eh := make(chan error, 1)
go func(l net.Listener) {
//新建路由表
h := http.NewServeMux()
//這裡表示newApiServer用來處理所有"/"路徑的請求
h.Handle("/", newApiServer(s))
hs := &http.Server{Handler: h}
//對每個net.Listener的連線,新建goroutine讀請求,並呼叫srv.Handler進行處理
eh <- hs.Serve(l)
}(s.ladmin)
select {
case <-s.exit.C:
log.Warnf("[%p] admin shutdown", s)
case err := <-eh:
log.ErrorErrorf(err, "[%p] admin exit on error", s)
}
}
//在/pkg/proxy/proxy_api.go中,請求轉發到不同的路徑,r是路由規則,最終的返回結果是由路由規則得到的handler
func newApiServer(p *Proxy) http.Handler {
m := martini.New()
m.Use(martini.Recovery())
m.Use(render.Renderer())
m.Use(func(w http.ResponseWriter, req *http.Request, c martini.Context) {
path := req.URL.Path
if req.Method != "GET" && strings.HasPrefix(path, "/api/") {
var remoteAddr = req.RemoteAddr
var headerAddr string
for _, key := range []string{"X-Real-IP", "X-Forwarded-For"} {
if val := req.Header.Get(key); val != "" {
headerAddr = val
break
}
}
log.Warnf("[%p] API call %s from %s [%s]", p, path, remoteAddr, headerAddr)
}
c.Next()
})
m.Use(gzip.All())
m.Use(func(c martini.Context, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
})
api := &apiServer{proxy: p}
r := martini.NewRouter()
r.Get("/", func(r render.Render) {
r.Redirect("/proxy")
})
r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) {
http.DefaultServeMux.ServeHTTP(w, req)
})
r.Group("/proxy", func(r martini.Router) {
r.Get("", api.Overview)
r.Get("/model", api.Model)
r.Get("/stats", api.StatsNoXAuth)
r.Get("/slots", api.SlotsNoXAuth)
})
r.Group("/api/proxy", func(r martini.Router) {
r.Get("/model", api.Model)
r.Get("/xping/:xauth", api.XPing)
r.Get("/stats/:xauth", api.Stats)
r.Get("/stats/:xauth/:flags", api.Stats)
r.Get("/slots/:xauth", api.Slots)
r.Put("/start/:xauth", api.Start)
r.Put("/stats/reset/:xauth", api.ResetStats)
r.Put("/forcegc/:xauth", api.ForceGC)
r.Put("/shutdown/:xauth", api.Shutdown)
r.Put("/loglevel/:xauth/:value", api.LogLevel)
r.Put("/fillslots/:xauth", binding.Json([]*models.Slot{}), api.FillSlots)
r.Put("/sentinels/:xauth", binding.Json(models.Sentinel{}), api.SetSentinels)
r.Put("/sentinels/:xauth/rewatch", api.RewatchSentinels)
})
m.MapTo(r, (*martini.Routes)(nil))
m.Action(r.Handle)
return m
}
後面的serveProxy,s.acceptConn(l)是啟動goroutine來監聽redis請求,啟動的時候肯定是沒有請求過來的,所以我們看到這裡為止, 下一節Codis原始碼解析——proxy監聽redis請求會詳細紹NewSession(c, s.config).Start(s.router)這裡具體做了什麼。
func (s *Proxy) serveProxy() {
if s.IsClosed() {
return
}
defer s.Close()
log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr())
eh := make(chan error, 1)
go func(l net.Listener) (err error) {
defer func() {
eh <- err
}()
for {
//啟動goroutine監聽19000埠請求,有請求到來的時候,返回net.Conn
c, err := s.acceptConn(l)
if err != nil {
return err
}
NewSession(c, s.config).Start(s.router)
}
}(s.lproxy)
if d := s.config.BackendPingPeriod.Duration(); d != 0 {
go s.keepAlive(d)
}
select {
case <-s.exit.C:
log.Warnf("[%p] proxy shutdown", s)
case err := <-eh:
log.ErrorErrorf(err, "[%p] proxy exit on error", s)
}
}
最後面的三個方法,由於通常啟動的時候,配置檔案中的MetircsReporterServer,MetircsInfluxdbServer和MetircsStatsdPeriod三個欄位都為空字串,實際上什麼都沒做。
再回到主main函式,看看最後的一段程式碼
//啟動時dashboard、coordinator.name和slots這三個引數都為空,所以此時這三個goroutine都沒有執行
switch {
case dashboard != "":
go AutoOnlineWithDashboard(s, dashboard)
case coordinator.name != "":
go AutoOnlineWithCoordinator(s, coordinator.name, coordinator.addr)
case slots != nil:
go AutoOnlineWithFillSlots(s, slots)
}
//未關閉,但也不線上的時候,控制檯每秒輸出日誌
for !s.IsClosed() && !s.IsOnline() {
log.Warnf("[%p] proxy waiting online ...", s)
time.Sleep(time.Second)
}
到這裡,整個proxy啟動過程結束。啟動之後,Proxy會預設處於 waiting 狀態,以一秒一次的頻率重新整理狀態,監聽proxy_addr 地址(預設配置檔案中的19000埠),但是不會 accept 連線,通過fe或者命令列新增到叢集並完成叢集狀態的同步,才能改變狀態為online。新增proxy到叢集的過程,詳見個人另一篇部落格Codis原始碼解析——proxy新增到叢集
到這裡,我們總結一下,proxy啟動過程中的流程:
讀取配置檔案,獲取Config物件。根據Config新建Proxy,填充Proxy的各個屬性,這裡面比較重要的是填充models.Proxy(詳細資訊可以在zk中檢視),並且與zk連線、註冊相關路徑。啟動goroutine監聽11080埠的codis叢集發過來的請求並進行轉發,以及監聽發到19000埠的redis請求並進行相關處理。