Codis原始碼解析——dashboard的啟動(1)
dashboard是codis的叢集管理工具,支援proxy和server的新增、刪除、資料遷移,所有對叢集的操作必須通過dashboard。dashboard的啟動過程和proxy類似。dashboard的啟動只是初始化一些必要的資料結構,複雜的在於對叢集的操作,這個日後的文章會有詳細的描述,本文先不管這些。
啟動的時候,首先讀取配置檔案,填充config資訊。根據coordinator的資訊,如果是zookeeper而不是etcd的話,就建立一個zk客戶端。然後根據client和config建立一個topom。首先來看看topom中有哪些資訊。這個類在/pkg/topom/topom.go中。Topom非常重要,這個結構裡面儲存了叢集中某一時刻所有的節點資訊,在深入codis的過程中我們會逐步看到
type Topom struct {
mu sync.Mutex
//初始化之後,這個屬性中的資訊可以在zk中看到,就像models.Proxy一樣
//路徑是/codis3/codis-wujiang/topom
model *models.Topom
//儲存著zkClient以及product-name,Topom與zk互動都是通過這個store
store *models.Store
//快取結構,如果快取為空就通過store從zk中取出slot的資訊並填充cache
//不是隻有第一次啟動的時候cache會為空,如果叢集中的元素(server,slot等等)發生變化,都會呼叫dirtyCache,將cache中的資訊置為nil
//這樣下一次呼叫s.newContext()獲取上下文資訊獲取上下文資訊的時候,就會通過Topom.store從zk中重新拉取
cache struct {
hooks list.List
slots []*models.SlotMapping
group map[int]*models.Group
proxy map[string]*models.Proxy
sentinel *models.Sentinel
}
exit struct {
C chan struct {}
}
//與dashboard相關的所有配置資訊
config *Config
online bool
closed bool
ladmin net.Listener
//槽進行遷移的時候使用
action struct {
//這個pool,其實就是map[string]*list.List,用於儲存redis的結構,裡面有addr,auth和Timeout。相當於快取,需要的時候從這裡取,否則就新建然後put進來
//鍵為redis伺服器的地址,值為與這臺伺服器建立的連線,過期的連線會被刪除
//timeout為配置檔案dashboard.toml中的migration_timeout選項所配
redisp *redis.Pool
interval atomic2.Int64
disabled atomic2.Bool
progress struct {
status atomic.Value
}
//一個計數器,有一個slot等待遷移,就加一;執行一個slot的遷移,就減一
executor atomic2.Int64
}
//儲存叢集中redis和proxy詳細資訊,goroutine每次重新整理redis和proxy之後,都會將結果存在這裡
stats struct {
//timeout為5秒
redisp *redis.Pool
servers map[string]*RedisStats
proxies map[string]*ProxyStats
}
//這個在使用哨兵的時候會用到,儲存在fe中配置的哨兵以及哨兵所監控的redis主伺服器
ha struct {
//timeout為5秒
redisp *redis.Pool
monitor *redis.Sentinel
masters map[int]string
}
}
建立topom的方法如下所示,這裡傳入的client,是根據coordinator建立的zkClient
func New(client models.Client, config *Config) (*Topom, error) {
//配置檔案校驗
if err := config.Validate(); err != nil {
return nil, errors.Trace(err)
}
if err := models.ValidateProduct(config.ProductName); err != nil {
return nil, errors.Trace(err)
}
s := &Topom{}
s.config = config
s.exit.C = make(chan struct{})
//新建redis pool
s.action.redisp = redis.NewPool(config.ProductAuth, config.MigrationTimeout.Duration())
s.action.progress.status.Store("")
s.ha.redisp = redis.NewPool("", time.Second*5)
s.model = &models.Topom{
StartTime: time.Now().String(),
}
s.model.ProductName = config.ProductName
s.model.Pid = os.Getpid()
s.model.Pwd, _ = os.Getwd()
if b, err := exec.Command("uname", "-a").Output(); err != nil {
log.WarnErrorf(err, "run command uname failed")
} else {
s.model.Sys = strings.TrimSpace(string(b))
}
s.store = models.NewStore(client, config.ProductName)
s.stats.redisp = redis.NewPool(config.ProductAuth, time.Second*5)
s.stats.servers = make(map[string]*RedisStats)
s.stats.proxies = make(map[string]*ProxyStats)
if err := s.setup(config); err != nil {
s.Close()
return nil, err
}
log.Warnf("create new topom:\n%s", s.model.Encode())
go s.serveAdmin()
return s, nil
}
新建redis pool的方法在/pkg/utils/redis/client.go中。auth如果在配置檔案中沒有設定,就是一個空字串。timeout時間的單位是納秒,配置檔案中預設是30s,也就是3乘以10的10次方納秒。
如果連線池收到退出的訊息,就直接return,並且每隔一分鐘清理連線池中的資料。清理規則是,從當前Pool的pool中,遍歷取出每個pool屬性。前面已經說過,這個pool屬性其實就是map[string]*list.List,從每個list中取出頭一個元素,轉為Client型別,判斷是否還是可再利用的,如果是可再利用的,就重新將該Client放回到佇列的尾部。可再利用的規則是,如果Pool的timeout為0,或者該Client上次距離最近一次被引用到現在的時間小於Pool的timeout,就是可再利用的。
func NewPool(auth string, timeout time.Duration) *Pool {
p := &Pool{
auth: auth, timeout: timeout,
pool: make(map[string]*list.List),
}
p.exit.C = make(chan struct{})
if timeout != 0 {
go func() {
var ticker = time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-p.exit.C:
return
case <-ticker.C:
//每隔一分鐘清理Pool中無效的Client
p.Cleanup()
}
}
}()
}
return p
}
//RedisClient結構,對於每臺redis伺服器,都會有多個連線,過期的連線將會被清除
type Client struct {
conn redigo.Conn
Addr string
Auth string
Database int
//上次使用時間,用於看某個client是否應該被回收
LastUse time.Time
Timeout time.Duration
}
細心的讀者可能已經發現,上一步初始化的redis pool是Topom.action.Pool,在Topom中實際上還有另外兩個池,分別在stats和ha中。可以看一下redis pool此時的結構,在dashboard的初始化過程中,這三個池都只是把基礎的資料結構建好。
與之前初始化Proxy的方式類似,現在我們正在初始化的結構是/pkg/topom/topom.go中的結構,而/pkg/models/topom.go中儲存了系統的相關資訊。接下來幾步,在/pkg/models/topom.go中填充相關資訊,初始化完成之後可以在zk中看到。
建立Topom的最後兩步,監聽、並得到路由handler
//Topom的ladmin監聽配置檔案中的admin_addr,生成Token和Xauth
if err := s.setup(config); err != nil {
s.Close()
return nil, err
}
log.Warnf("create new topom:\n%s", s.model.Encode())
//採用martini框架,得到路由,並從路由得到handler。這一步的原理和proxy的類似,就不再贅述
go s.serveAdmin()
以上兩步的處理方式和proxy的啟動中類似,監聽18080埠(dashboard與codis叢集互動的預設介面),並採用martini框架對傳送過來的請求進行轉發
func (s *Topom) setup(config *Config) error {
if l, err := net.Listen("tcp", config.AdminAddr); err != nil {
return errors.Trace(err)
} else {
s.ladmin = l
x, err := utils.ReplaceUnspecifiedIP("tcp", l.Addr().String(), s.config.HostAdmin)
if err != nil {
return err
}
s.model.AdminAddr = x
}
s.model.Token = rpc.NewToken(
config.ProductName,
s.ladmin.Addr().String(),
)
s.xauth = rpc.NewXAuth(config.ProductName)
return nil
}
func (s *Topom) serveAdmin() {
if s.IsClosed() {
return
}
defer s.Close()
log.Warnf("admin start service on %s", s.ladmin.Addr())
eh := make(chan error, 1)
go func(l net.Listener) {
h := http.NewServeMux()
h.Handle("/", newApiServer(s))
hs := &http.Server{Handler: h}
eh <- hs.Serve(l)
}(s.ladmin)
select {
case <-s.exit.C:
log.Warnf("admin shutdown")
case err := <-eh:
log.ErrorErrorf(err, "admin exit on error")
}
}
func newApiServer(t *Topom) http.Handler {
m := martini.New()
m.Use(martini.Recovery())
m.Use(render.Renderer())
m.Use(func(w http.ResponseWriter, req *http.Request, c martini.Context) {
path := req.URL.Path
if req.Method != "GET" && strings.HasPrefix(path, "/api/") {
var remoteAddr = req.RemoteAddr
var headerAddr string
for _, key := range []string{"X-Real-IP", "X-Forwarded-For"} {
if val := req.Header.Get(key); val != "" {
headerAddr = val
break
}
}
log.Warnf("[%p] API call %s from %s [%s]", t, path, remoteAddr, headerAddr)
}
c.Next()
})
m.Use(gzip.All())
m.Use(func(c martini.Context, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
})
api := &apiServer{topom: t}
r := martini.NewRouter()
r.Get("/", func(r render.Render) {
r.Redirect("/topom")
})
r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) {
http.DefaultServeMux.ServeHTTP(w, req)
})
r.Group("/topom", func(r martini.Router) {
r.Get("", api.Overview)
r.Get("/model", api.Model)
r.Get("/stats", api.StatsNoXAuth)
r.Get("/slots", api.SlotsNoXAuth)
})
r.Group("/api/topom", func(r martini.Router) {
r.Get("/model", api.Model)
r.Get("/xping/:xauth", api.XPing)
r.Get("/stats/:xauth", api.Stats)
r.Get("/slots/:xauth", api.Slots)
r.Put("/reload/:xauth", api.Reload)
r.Put("/shutdown/:xauth", api.Shutdown)
r.Put("/loglevel/:xauth/:value", api.LogLevel)
r.Group("/proxy", func(r martini.Router) {
r.Put("/create/:xauth/:addr", api.CreateProxy)
r.Put("/online/:xauth/:addr", api.OnlineProxy)
r.Put("/reinit/:xauth/:token", api.ReinitProxy)
r.Put("/remove/:xauth/:token/:force", api.RemoveProxy)
})
r.Group("/group", func(r martini.Router) {
r.Put("/create/:xauth/:gid", api.CreateGroup)
r.Put("/remove/:xauth/:gid", api.RemoveGroup)
r.Put("/resync/:xauth/:gid", api.ResyncGroup)
r.Put("/resync-all/:xauth", api.ResyncGroupAll)
r.Put("/add/:xauth/:gid/:addr", api.GroupAddServer)
r.Put("/add/:xauth/:gid/:addr/:datacenter", api.GroupAddServer)
r.Put("/del/:xauth/:gid/:addr", api.GroupDelServer)
r.Put("/promote/:xauth/:gid/:addr", api.GroupPromoteServer)
r.Put("/replica-groups/:xauth/:gid/:addr/:value", api.EnableReplicaGroups)
r.Put("/replica-groups-all/:xauth/:value", api.EnableReplicaGroupsAll)
r.Group("/action", func(r martini.Router) {
r.Put("/create/:xauth/:addr", api.SyncCreateAction)
r.Put("/remove/:xauth/:addr", api.SyncRemoveAction)
})
r.Get("/info/:addr", api.InfoServer)
})
r.Group("/slots", func(r martini.Router) {
r.Group("/action", func(r martini.Router) {
r.Put("/create/:xauth/:sid/:gid", api.SlotCreateAction)
r.Put("/create-some/:xauth/:src/:dst/:num", api.SlotCreateActionSome)
r.Put("/create-range/:xauth/:beg/:end/:gid", api.SlotCreateActionRange)
r.Put("/remove/:xauth/:sid", api.SlotRemoveAction)
r.Put("/interval/:xauth/:value", api.SetSlotActionInterval)
r.Put("/disabled/:xauth/:value", api.SetSlotActionDisabled)
})
r.Put("/assign/:xauth", binding.Json([]*models.SlotMapping{}), api.SlotsAssignGroup)
r.Put("/assign/:xauth/offline", binding.Json([]*models.SlotMapping{}), api.SlotsAssignOffline)
r.Put("/rebalance/:xauth/:confirm", api.SlotsRebalance)
})
r.Group("/sentinels", func(r martini.Router) {
r.Put("/add/:xauth/:addr", api.AddSentinel)
r.Put("/del/:xauth/:addr/:force", api.DelSentinel)
r.Put("/resync-all/:xauth", api.ResyncSentinels)
r.Get("/info/:addr", api.InfoSentinel)
r.Get("/info/:addr/monitored", api.InfoSentinelMonitored)
})
})
m.MapTo(r, (*martini.Routes)(nil))
m.Action(r.Handle)
return m
}
topom初始化成功之後,看一下控制檯上列印的日誌。
建立Topom之後,下一步就是建立一個channel,專門用來接收系統signal。這個signal在隨不同的系統而變化。著重說一下signal.Notify方法。第一個引數是channel,後面的可變引數是往channel中寫入的訊號。如果沒有制定任何訊號引數,就預設所有收到的訊號都會寫入channel。
go func() {
defer s.Close()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
//將收到的系統訊號讀取並列印在日誌
sig := <-c
log.Warnf("[%p] dashboard receive signal = '%v'", s, sig)
}()
這樣做的目的是什麼呢?
比方說,當你強行停止掉dashboard程序,console上就會出現日誌
Process finished with exit code 137 (interrupted by signal 9: SIGKILL)
然後呼叫defer s.Close()來刪除dashboard在zk的註冊路徑,下次啟動dashboard就不會報acquire lock of codis-demo failed的錯。
到這裡,Topom建立結束,下一步就是傳入固定引數true,呼叫Topom的啟動方法。
很多人啟動dashboard的時候會報錯acquire lock of codis-demo failed,就是這裡報的錯。意思是說,建立路徑filepath.Join(CodisDir, product, “topom”)錯誤,報了node already exits。通常解決這個問題的方式就是遞迴刪除codis3資料夾的內容,然後重新建立
cd /app/zookeeper-3.4.6/bin/
./zkCli.sh
rmr /codis3
func (s *Topom) Start(routines bool) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrClosedTopom
}
if s.online {
return nil
} else {
//建立zk路徑
if err := s.store.Acquire(s.model); err != nil {
log.ErrorErrorf(err, "store: acquire lock of %s failed", s.config.ProductName)
return errors.Errorf("store: acquire lock of %s failed", s.config.ProductName)
}
s.online = true
}
if !routines {
return nil
}
go func() {
for !s.IsClosed() {
if s.IsOnline() {
//重新整理redis狀態
w, _ := s.RefreshRedisStats(time.Second)
if w != nil {
w.Wait()
}
}
time.Sleep(time.Second)
}
}()
go func() {
for !s.IsClosed() {
if s.IsOnline() {
//重新整理proxy狀態
w, _ := s.RefreshProxyStats(time.Second)
if w != nil {
w.Wait()
}
}
time.Sleep(time.Second)
}
}()
go func() {
for !s.IsClosed() {
if s.IsOnline() {
//處理slot操作
if err := s.ProcessSlotAction(); err != nil {
log.WarnErrorf(err, "process slot action failed")
time.Sleep(time.Second * 5)
}
}
time.Sleep(time.Second)
}
}()
go func() {
for !s.IsClosed() {
if s.IsOnline() {
//處理同步操作
if err := s.ProcessSyncAction(); err != nil {
log.WarnErrorf(err, "process sync action failed")
time.Sleep(time.Second * 5)
}
}
time.Sleep(time.Second)
}
}()
return nil
}
之前我們已經說過,整個叢集的操作和管理都要經過dashboard,因此dashboard中必須存有叢集狀態。codis如何處理狀態快取的有效性和過期問題呢?沒錯,就是上面程式碼中看起來很像的四個Goroutine。前兩個方法的具體實現在/pkg/topom/topom_stats.go中,後兩個方法的具體實現則是在/pkg/topom/topom_action.go。在下一節 Codis原始碼解析——dashboard的啟動(2)我們具體講這四個方法的實現
總結一下,啟動dashboard的過程中,需要連線zk,建立Topom這個struct,通過18080這個埠與叢集進項互動,並將該埠收到的資訊進行轉發。最重要的是啟動了四個goroutine,重新整理叢集中的redis和proxy的狀態,以及處理slot和同步操作。