1. 程式人生 > 實用技巧 >Codis原始碼分析之環境篇

Codis原始碼分析之環境篇

一、Codis介紹

Codis是豌豆莢開源的Redis叢集方案,github地址:

https://github.com/CodisLabs/codis

以下是官方介紹:

Codis 是一個分散式 Redis 解決方案, 對於上層的應用來說, 連線到 Codis Proxy 和連線原生的 Redis Server 沒有顯著區別 (不支援的命令列表), 上層應用可以像使用單機的 Redis 一樣使用, Codis 底層會處理請求的轉發, 不停機的資料遷移等工作, 所有後邊的一切事情, 對於前面的客戶端來說是透明的, 可以簡單地認為後邊連線的是一個記憶體無限大的 Redis 服務。

https://github.com/CodisLabs/codis/blob/release3.2/doc/tutorial_zh.md

Codis主要解決的是redis的擴充套件和運維問題,因為redis官方以前沒有叢集方案,自從3.0才有,並且剛開始做的比較弱,特別是運維這塊不是很友好,很多都是命令列操作的。我們可以認為Codis是一個可以支援容量可以無限擴大的Redis叢集就行。

二、Codis安裝及配置

1、整體架構

Codis是由golang寫的,因此需要先安裝go語言,官方給的文件已經很詳細了,這裡不詳述。

我們看下Codis的整體架構,基於3.2:

2、相關概念

Codis Server:可以理解為原生的Redis,在整個叢集中充當儲存用,即最終資料是存在Redis中的,不過Codis在上面改了些東西,加了些命令,便於叢集遷移和分片;

Codis Proxy:客戶端連線的 Redis 代理服務, 實現了 Redis 協議。即Proxy只是一個代理,把命令做解析,然後根據路由規則轉到不同的Codis Server中,Proxy是無狀態的,可以無限擴容,Proxy資訊儲存在Zookeeper等Storage(後面再介紹)中,客戶端需要自己重新整理最新的Proxy列表。

Codis Dashboard:這個是Codis的核心,對叢集的絕大部分修改都是它來完成的。官方的介紹如下:叢集管理工具,支援 codis-proxy、codis-server 的新增、刪除,以及據遷移等操作;說的比較籠統,下面我們會通過程式碼來說明其作用。

Codis FE:叢集管理介面,可以理解為UI。

Codis Admin:叢集管理的命令列工具,可用於控制 codis-proxy、codis-dashboard 狀態以及訪問外部儲存。為什麼有了Dashboard,還要Codis Admin,這個工具的主要定位是通過命令列的方式操作,其實後面的邏輯是一樣的,即Codis Admin和Codis Fe是兩種不同的介面,最後的邏輯是一樣的。

Storage:主要儲存元資料,如叢集有多少個 Proxy,當前的分片是怎麼樣的。目前有 Zookeeper、Etcd、Fs等方式的實現。

3、叢集啟動

整個叢集的啟動方式官方也有說明,大體順序如下:

1)、啟動codis-dashboard

/admin/codis-dashboard-admin.sh start

最終啟動的是codis-dashboard這個二進位制檔案:

./bin/codis-dashboard --ncpu=4 --config=dashboard.toml \    --log=dashboard.log --log-level=WARN

主要的引數是--config,即配置檔案,核心幾個配置如下:

coordinator_name = "zookeeper"
coordinator_addr = "127.0.0.1:2181"

# Set Codis Product {Name/Auth}.
product_name = "codis-demo"
product_auth = ""

# Set bind address for admin(rpc), tcp only.
admin_addr = "0.0.0.0:18080"

coordinator_name和coordinator_addr配置的是Storage的型別及地址;

product_name配置的是叢集的名稱,Codis以Product作為一個叢集,每個叢集有不同的Codis Server和Codis Proxy及Dashboard;product_auth為密碼,如果開發環境建議留空,生產上最好是不為空。

2)、啟動codis-proxy

./admin/codis-proxy-admin.sh start

最終啟動的是codis-proxy這個二進位制檔案

./bin/codis-proxy --ncpu=4 --config=proxy.toml \    --log=proxy.log --log-level=WARN &

最主要的是--config引數,核心引數有:

product_name = "codis-demo"
product_auth = ""

# Set bind address for admin(rpc), tcp only.
admin_addr = "0.0.0.0:11080"

product_name和product_auth要和上面的Codis Dashboard對應上

3)啟動codis-server

./admin/codis-server-admin.sh start

4)啟動codis-fe

./admin/codis-fe-admin.sh star

最終啟動的是codis-fe這個二進位制檔案

./bin/codis-fe --ncpu=4 --log=fe.log --log-level=WARN \    --zookeeper=127.0.0.1:2181 --listen=127.0.0.1:8080 &

listen引數比較重要,就是我們要在瀏覽器訪問的地址了。

Codis Admin是命令列不用啟動,每次執行一個命令就退出了。

三、程式碼分析各元件作用

前面講到了Codis的和元件,總體還是比較多的,我剛開始也是一頭霧水,通過看程式碼就比較清晰了。

我們先看下Codis整體程式碼結構:

程式碼大概在以下目錄:

cmd:上面說的二進位制檔案的入口,即main函式所在檔案,這個下面不同命令有不同的資料夾,都叫main.go。

pkg:核心的業務邏輯

vendor:主要是存放第三方程式碼庫

1)、Codis Dashboard


s, err := topom.New(client, config)
  if err != nil {
    log.PanicErrorf(err, "create topom with config file failed\n%s", config)
  }
  
  //省略一些程式碼
  for i := 0; !s.IsClosed() && !s.IsOnline(); i++ {
    //啟動Topom
    if err := s.Start(true); err != nil {
      if i <= 15 {
        log.Warnf("[%p] dashboard online failed [%d]", s, i)
      } else {
        log.Panicf("dashboard online failed, give up & abort :'(")
      }
      time.Sleep(time.Second * 2)
    }
  }

dashboard的初始化邏輯都在上面了,幾個關鍵程式碼是topom.New以及s.Start(true),Start的邏輯以後再細講,今天關注 topom的初始化。

topom.new會啟動一個協程執行:

s.serveAdmin()

這個函式內部會啟動一個apiServer,以下是具體的路由:

r.Get("/", func(r render.Render) {
    r.Redirect("/topom")
  })
  r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) {
    http.DefaultServeMux.ServeHTTP(w, req)
  })

  r.Group("/topom", func(r martini.Router) {
    r.Get("", api.Overview)
    r.Get("/model", api.Model)
    r.Get("/stats", api.StatsNoXAuth)
    r.Get("/slots", api.SlotsNoXAuth)
  })
  r.Group("/api/topom", func(r martini.Router) {
    r.Get("/model", api.Model)
    r.Get("/xping/:xauth", api.XPing)
    r.Get("/stats/:xauth", api.Stats)
    r.Get("/slots/:xauth", api.Slots)
    r.Put("/reload/:xauth", api.Reload)
    r.Put("/shutdown/:xauth", api.Shutdown)
    r.Put("/loglevel/:xauth/:value", api.LogLevel)
    
    r.Group("/proxy", func(r martini.Router) {
      r.Put("/create/:xauth/:addr", api.CreateProxy)
      r.Put("/online/:xauth/:addr", api.OnlineProxy)
      r.Put("/reinit/:xauth/:token", api.ReinitProxy)
      r.Put("/remove/:xauth/:token/:force", api.RemoveProxy)
    })
    r.Group("/group", func(r martini.Router) {
      r.Put("/create/:xauth/:gid", api.CreateGroup)
      r.Put("/remove/:xauth/:gid", api.RemoveGroup)
      r.Put("/resync/:xauth/:gid", api.ResyncGroup)
      r.Put("/resync-all/:xauth", api.ResyncGroupAll)
      r.Put("/add/:xauth/:gid/:addr", api.GroupAddServer)
      r.Put("/add/:xauth/:gid/:addr/:datacenter", api.GroupAddServer)
      r.Put("/del/:xauth/:gid/:addr", api.GroupDelServer)
      r.Put("/promote/:xauth/:gid/:addr", api.GroupPromoteServer)
      r.Put("/replica-groups/:xauth/:gid/:addr/:value", api.EnableReplicaGroups)
      r.Put("/replica-groups-all/:xauth/:value", api.EnableReplicaGroupsAll)
      r.Group("/action", func(r martini.Router) {
        r.Put("/create/:xauth/:addr", api.SyncCreateAction)
        r.Put("/remove/:xauth/:addr", api.SyncRemoveAction)
      })
      r.Get("/info/:addr", api.InfoServer)
    })
    r.Group("/slots", func(r martini.Router) {
      r.Group("/action", func(r martini.Router) {
        r.Put("/create/:xauth/:sid/:gid", api.SlotCreateAction)
        r.Put("/create-some/:xauth/:src/:dst/:num", api.SlotCreateActionSome)
        r.Put("/create-range/:xauth/:beg/:end/:gid", api.SlotCreateActionRange)
        r.Put("/remove/:xauth/:sid", api.SlotRemoveAction)
        r.Put("/interval/:xauth/:value", api.SetSlotActionInterval)
        r.Put("/disabled/:xauth/:value", api.SetSlotActionDisabled)
      })
      r.Put("/assign/:xauth", binding.Json([]*models.SlotMapping{}), api.SlotsAssignGroup)
      r.Put("/assign/:xauth/offline", binding.Json([]*models.SlotMapping{}), api.SlotsAssignOffline)
      r.Put("/rebalance/:xauth/:confirm", api.SlotsRebalance)
    })
    r.Group("/sentinels", func(r martini.Router) {
      r.Put("/add/:xauth/:addr", api.AddSentinel)
      r.Put("/del/:xauth/:addr/:force", api.DelSentinel)
      r.Put("/resync-all/:xauth", api.ResyncSentinels)
      r.Get("/info/:addr", api.InfoSentinel)
      r.Get("/info/:addr/monitored", api.InfoSentinelMonitored)
    })
  })

可以看到dashboard有以下幾塊功能:

proxy管理:即/proxy開頭的請求

group管理及slots管理,還有一些統計資訊。

2)、Codis Fe

入口為cmd/fe下面的main.go,核心就啟動2個路由處理:

r.Get("/list", func() (int, string) {
    names := router.GetNames()
    sort.Sort(sort.StringSlice(names))
    return rpc.ApiResponseJson(names)
  })

  r.Any("/**", func(w http.ResponseWriter, req *http.Request) {
    name := req.URL.Query().Get("forward")
    if p := router.GetProxy(name); p != nil {
      p.ServeHTTP(w, req)
    } else {
      w.WriteHeader(http.StatusForbidden)
    }
  })

其中/list請求是進入Codis Fe主介面就會呼叫的,用來展示當前有多少叢集的;

還有一個路由是轉發的,即如果引數中有forward引數,會把請求轉到相應的Proxy來處理。

可以看到Proxy就兩功能:一是顯示所有Product Name,即當前有多少叢集,第二就是請求轉發,即將請求轉到具體的Proxy。

3)、Codis Proxy

入口檔案在 cmd/proxy/main.go,

s, err := proxy.New(config)

proxy.New會啟動幾個協程來處理任務:

go s.serveAdmin()
  go s.serveProxy()
  //下面是一些統計資訊
  s.startMetricsJson()
  s.startMetricsInfluxdb()
  s.startMetricsStatsd()

重點關注前面2個,特別是第2個,s.serveProxy:


go func(l net.Listener) (err error) {
    defer func() {
      eh <- err
    }()
    for {
      c, err := s.acceptConn(l)
      if err != nil {
        return err
      }
      NewSession(c, s.config).Start(s.router)
    }
  }(s.lproxy)

可以看到,Proxy就是不斷的accept連線,然後啟動一個Session。

總結下:Proxy的作用是處理客戶端的連線,然後為每個連線建立一個Session,再根據客戶端具體輸入的命令進行處理,具體細節後續再聊。