Codis原始碼解析——dashboard的啟動(2)
1 重新整理redis狀態
首先認識兩個重要的struct
type Future struct {
sync.Mutex
wait sync.WaitGroup
vmap map[string]interface{}
}
type RedisStats struct {
//儲存了叢集中Redis伺服器的各種資訊和統計數值,詳見redis的info命令
Stats map[string]string `json:"stats,omitempty"`
Error *rpc.RemoteError `json:"error,omitempty"`
Sentinel map[string]*redis.SentinelGroup `json:"sentinel,omitempty"`
UnixTime int64 `json:"unixtime"`
Timeout bool `json:"timeout,omitempty"`
}
接下來看看dashboard如何重新整理redis狀態
func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error) {
s.mu.Lock()
defer s.mu.Unlock()
//從快取中讀出slots,group,proxy,sentinel等資訊封裝在context struct中
ctx, err := s.newContext()
if err != nil {
return nil, err
}
var fut sync2.Future
goStats := func(addr string, do func(addr string) (*RedisStats, error)) {
fut.Add()
go func() {
stats := s.newRedisStats(addr, timeout, do)
stats.UnixTime = time.Now().Unix()
//vmap中新增鍵為addr,值為RedisStats的map
fut.Done(addr, stats)
}()
}
//遍歷ctx中的group,再遍歷每個group中的Server。如果對group和Server結構不清楚的,可以看看/pkg/models/group.go檔案
//每個Group除了id,還有一個屬性就是GroupServer。每個GroupServer有自己的地址、資料中心、action等等
for _, g := range ctx.group {
for _, x := range g.Servers {
goStats(x.Addr, func(addr string) (*RedisStats, error) {
//前面我們已經說過,Topom中有三個redis pool,分別是action,stats,ha。pool本質上就是map[String]*list.List。
//這個是從stats的pool中根據Server的地址從pool中取redis client,如果沒有client,就建立
//然後加入到pool裡面,並通過Info命令獲取詳細資訊。整個流程和下面的sentinel類似,這裡就不放具體的方法實現了
m, err := s.stats.redisp.InfoFull(addr)
if err != nil {
return nil, err
}
return &RedisStats{Stats: m}, nil
})
}
}
//通過sentinel維護codis叢集中每一組的主備關係
for _, server := range ctx.sentinel.Servers {
goStats(server, func(addr string) (*RedisStats, error) {
c, err := s.ha.redisp.GetClient(addr)
if err != nil {
return nil, err
}
//實際上就是將client加入到Pool的pool屬性裡面去,pool本質上就是map[String]*list.List
//鍵是client的addr,值是client本身
//如果client不存在,就新建一個空的list
defer s.ha.redisp.PutClient(c)
m, err := c.Info()
if err != nil {
return nil, err
}
sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
//獲得map[string]*SentinelGroup,鍵是每一組的master的名字,SentinelGroup則是主從對
p, err := sentinel.MastersAndSlavesClient(c)
if err != nil {
return nil, err
}
return &RedisStats{Stats: m, Sentinel: p}, nil
})
}
//前面的所有gostats執行完之後,遍歷Future的vmap,將值賦給Topom.stats.servers
go func() {
stats := make(map[string]*RedisStats)
for k, v := range fut.Wait() {
stats[k] = v.(*RedisStats)
}
s.mu.Lock()
defer s.mu.Unlock()
s.stats.servers = stats
}()
return &fut, nil
}
func (p *Pool) GetClient(addr string) (*Client, error) {
c, err := p.getClientFromCache(addr)
if err != nil || c != nil {
return c, err
}
return NewClient(addr, p.auth, p.timeout)
}
func (p *Pool) getClientFromCache(addr string) (*Client, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, ErrClosedPool
}
if list := p.pool[addr]; list != nil {
for i := list.Len(); i != 0; i-- {
c := list.Remove(list.Front()).(*Client)
//一個client可被回收的條件是,Pool的timeout為0,或者這個client上一次使用距離現在小於Pool.timeout
//ha和stats裡面的Pool的timeout為5秒,action的則根據配置檔案dashboard.toml中的migration_timeout一項來決定
if p.isRecyclable(c) {
return c, nil
} else {
c.Close()
}
}
}
return nil, nil
}
type Client struct {
conn redigo.Conn
Addr string
Auth string
Database int
LastUse time.Time
Timeout time.Duration
}
RedisStats中的sentinel如下所示,有幾組主備,就有幾組SentinelGroup,鍵是product-name與group-id拼起來的
newContext一步主要就是呼叫refillCache,過載了四個快取,分別是refillCacheSlots,refillCacheGroup,refillCacheProxy和refillCacheSentinel。這四個方法基本一致,以refillCacheSlots為例。方法傳入的是Topom.cache.slots
type context struct {
slots []*models.SlotMapping
group map[int]*models.Group
proxy map[string]*models.Proxy
sentinel *models.Sentinel
hosts struct {
sync.Mutex
m map[string]net.IP
}
method int
}
//重新填充topom.cache中的資料,並賦給context結構
func (s *Topom) newContext() (*context, error) {
if s.closed {
return nil, ErrClosedTopom
}
if s.online {
if err := s.refillCache(); err != nil {
return nil, err
} else {
ctx := &context{}
ctx.slots = s.cache.slots
ctx.group = s.cache.group
ctx.proxy = s.cache.proxy
ctx.sentinel = s.cache.sentinel
ctx.hosts.m = make(map[string]net.IP)
ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod)
return ctx, nil
}
} else {
return nil, ErrNotOnline
}
}
func (s *Topom) refillCacheSlots(slots []*models.SlotMapping) ([]*models.SlotMapping, error) {
//如果cache中的slots為空,就直接返回store裡面的slots
if slots == nil {
return s.store.SlotMappings()
}
for i, _ := range slots {
//如果cache中的slots[i]不為空,直接進入下一個迴圈
if slots[i] != nil {
continue
}
//如果slots[i]為空,就從store中取出對應的SlotMapping並賦值給cache中的這個slot
m, err := s.store.LoadSlotMapping(i, false)
if err != nil {
return nil, err
}
if m != nil {
slots[i] = m
} else {
//如果store中取出的對應的SlotMapping也為空,就新建一個SlotMapping賦值給當前slot
slots[i] = &models.SlotMapping{Id: i}
}
}
return slots, nil
}
func (s *Store) LoadSlotMapping(sid int, must bool) (*SlotMapping, error) {
//返回值b是zkClient根據路徑轉化成的byte陣列
b, err := s.client.Read(s.SlotPath(sid), must)
if err != nil || b == nil {
return nil, err
}
m := &SlotMapping{}
//將byte陣列封裝在實體類SlotMapping實體類中
if err := jsonDecode(m, b); err != nil {
return nil, err
}
return m, nil
}
func (s *Store) SlotPath(sid int) string {
return SlotPath(s.product, sid)
}
//這裡的codisDir是/codis3
func SlotPath(product string, sid int) string {
return filepath.Join(CodisDir, product, "slots", fmt.Sprintf("slot-%04d", sid))
}
type SlotMapping struct {
Id int `json:"id"`
GroupId int `json:"group_id"`
Action struct {
Index int `json:"index,omitempty"`
State string `json:"state,omitempty"`
TargetId int `json:"target_id,omitempty"`
} `json:"action"`
}
總結一下,重新整理redis的過程中,首先建立上下文,從cache中讀取slots,group,proxy,sentinel等資訊,如果讀不到就通過store從zk上獲取,如果zk中也為空就建立。遍歷叢集中的redis伺服器以及主從關係,建立RedisStats並與addr關聯形成map,儲存在future的vmap中。全部儲存完後,再把vmap寫入Topom.stats.servers
我們可以在控制檯上打印出Topom.stats.redisp的相關資訊。因為goroutine中設定了每個一秒休眠,所以叢集的redisp實際上是每秒重新整理一次
stats redisp: &{{0 0} map[*.*.*.*:6379:0xc4206933e0 *.*.*.*:6380:0xc420693890 127.0.0.1:6379:0xc4206be540] 5000000000 {0xc420320000} false}
2 重新整理proxy狀態
重新整理proxy狀態的程式碼和重新整理redis的類似,就不贅述了。可以參照Codis原始碼解析——proxy新增到叢集
最後的步驟
3 處理同步操作
首先要明白,同步操作,指的就是一個group中的主從codis-server伺服器之間進行資料的同步,GroupServer是Group的一個屬性,標明瞭當前group中的所有codis-server的地址和action等等資訊
type Group struct {
Id int `json:"id"`
Servers []*GroupServer `json:"servers"`
Promoting struct {
Index int `json:"index,omitempty"`
State string `json:"state,omitempty"`
} `json:"promoting"`
OutOfSync bool `json:"out_of_sync"`
}
type GroupServer struct {
Addr string `json:"server"`
DataCenter string `json:"datacenter"`
Action struct {
Index int `json:"index,omitempty"`
State string `json:"state,omitempty"`
} `json:"action"`
ReplicaGroup bool `json:"replica_group"`
}
我們直接看ProcessSyncAction,在/pkg/topom/topom_action.go檔案中
func (s *Topom) ProcessSyncAction() error {
//同步操作之前的準備工作
addr, err := s.SyncActionPrepare()
if err != nil || addr == "" {
return err
}
log.Warnf("sync-[%s] process action", addr)
//執行同步操作
exec, err := s.newSyncActionExecutor(addr)
if err != nil || exec == nil {
return err
}
return s.SyncActionComplete(addr, exec() != nil)
}
同步操作之前的準備工作是,使用s.newContext()獲取上下文,從上下文中,遍歷每個group中的每個codis-server,從Action.State為pending的codis-server中,選出Action.Index最小的那臺伺服器,並獲取其所在的group,如果這個group的Promoting.State為nothing,這臺伺服器就可以從主伺服器同步資料。將這個codis-server的Action.Index設為0,Action.State設為syncing,更新zk中儲存的資訊,並將cache中關於這臺伺服器的資訊設為nil,這樣下次就會從store中重新載入資料到cache。
下一步,檢查當前server在group中的index,如果index不為0,就表示這臺server不是group中的主伺服器(codis是將group中index為0的那臺server作為主的),下一步就是當前server從主服務同步資料,通過redigo傳送同步命令
return func() error {
c, err := redis.NewClient(addr, s.config.ProductAuth, time.Minute*30)
if err != nil {
log.WarnErrorf(err, "create redis client to %s failed", addr)
return err
}
defer c.Close()
if err := c.SetMaster(master); err != nil {
log.WarnErrorf(err, "redis %s set master to %s failed", addr, master)
return err
}
return nil
}, nil
func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) {
c, err := redigo.Dial("tcp", addr, []redigo.DialOption{
redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)),
redigo.DialPassword(auth),
redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout),
}...)
if err != nil {
return nil, errors.Trace(err)
}
return &Client{
conn: c, Addr: addr, Auth: auth,
LastUse: time.Now(), Timeout: timeout,
}, nil
}
func (c *Client) SetMaster(master string) error {
host, port, err := net.SplitHostPort(master)
if err != nil {
return errors.Trace(err)
}
c.conn.Send("MULTI")
c.conn.Send("CONFIG", "SET", "masterauth", c.Auth)
c.conn.Send("SLAVEOF", host, port)
c.conn.Send("CONFIG", "REWRITE")
c.conn.Send("CLIENT", "KILL", "TYPE", "normal")
values, err := redigo.Values(c.Do("EXEC"))
if err != nil {
return errors.Trace(err)
}
for _, r := range values {
if err, ok := r.(redigo.Error); ok {
return errors.Trace(err)
}
}
return nil
}
同步之後,會將這臺codis-server的Action.State設定為”synced”或者”synced_failed”,並在zk中更新相關資訊,抹除cache。
注意,儘管整個過程中,都用了鎖,每次還是會檢查group的Promoting.State是否nothing,codis-server的Action.Index是否為0,Action.State是否為syncing,只有全部符合才進行同步
4 處理slot操作
到這裡,dashboard的啟動工作已經完成,可以看到,dashboard啟動過程中,實際上啟動了很多goroutine來對後續操作進行處理,這些我們都會在後面的文章的具體章節中做分析,這一節只需要關注到dashboard啟動過程中做了什麼即可。
相關推薦
myBatis原始碼解析-快取篇(2)
上一章分析了mybatis的原始碼的日誌模組,像我們經常說的mybatis一級快取,二級快取,快取究竟在底層是怎樣實現的。此次開始分析快取模組 1. 原始碼位置,mybatis原始碼包位於org.apache.ibatis.cache下,如圖 2. 先從org.apache.ibatis.cache下的cac
Codis原始碼解析——dashboard的啟動(2)
1 重新整理redis狀態 首先認識兩個重要的struct type Future struct { sync.Mutex wait sync.WaitGroup vmap map[string]interface{} } typ
Codis原始碼解析——dashboard的啟動(1)
dashboard是codis的叢集管理工具,支援proxy和server的新增、刪除、資料遷移,所有對叢集的操作必須通過dashboard。dashboard的啟動過程和proxy類似。dashboard的啟動只是初始化一些必要的資料結構,複雜的在於對叢集的操
TiKV 原始碼解析系列文章(七)gRPC Server 的初始化和啟動流程
作者:屈鵬 本篇 TiKV 原始碼解析將為大家介紹 TiKV 的另一週邊元件—— grpc-rs。grpc-rs 是 PingCA
解析MySQL binlog --(2)FORMAT_DESCRIPTION_EVENT
mysql binlog 該格式描述事件時binlog version 4中為了取代之前版本的START_EVENT_3事件而引入的。是binlog文件的第一個事件,並在一個binlog文件中僅出現一次。具體定義:binlog-version:binlog版本mysql-server version:
Dubbo——快速啟動(2)
快速啟動 Dubbo 採用全 Spring 配置方式,透明化接入應用,對應用沒有任何 API 侵入,只需用 Spring 載入 Dubbo 的配置即可,Dubbo 基於 Spring 的 Schema 擴充套件 進行載入。 mvn: <!-- dubbo 依賴-->
mybatis原始碼-解析配置檔案(三)之配置檔案Configuration解析(超詳細, 值得收藏)
1. 簡介 1.1 系列內容 本系列文章講解的是mybatis解析配置檔案內部的邏輯, 即 Reader reader = Resources.getResourceAsReader("mybatis-config.xml"); SqlSessionFact
mybatis原始碼-解析配置檔案(二)之解析的流程
1. 簡介 在之前的文章《mybatis 初步使用(IDEA的Maven專案, 超詳細)》中, 講解了mybatis的初步使用, 並總結了以下mybatis的執行流程: 通過 Resources 工具類讀取 mybatis-config.xml,
【原始碼剖析】Launcher 8.0 原始碼 25---使用者操作(2)模式切換
模式就是介面,除普通模式外,Launcher還有兩個特殊模式,分別是overView模式和Springloader模式。此處採用狀態模式這種設計模式,共有三個狀態。 overView模式是長按桌面空白處,出現特殊功能,比如設定桌布,新增widget,特殊設定(橫屏開關
caffe原始碼解析:層(layer)的註冊與管理
caffe中所有的layer都是類的結構,它們的構造相關的函式都註冊在一個全域性變數g_registry_ 中。 首先這個變數的型別 CreatorRegistry是一個map定義, public: typedef shared_ptr<Layer<Dt
mybatis原始碼-解析配置檔案(四)之配置檔案Mapper解析
其中, mappers作為configuration節點的一部分配置, 在本文章中, 我們講解解析mappers節點, 即 xxxMapper.xml 檔案的解析。 1 解析入口 在解析 mybatis-config.xml 時, 會進行解析 xxxMapper.xml 的檔案。 在圖示流程的 XMLCo
java集合類原始碼詳解-ArrayList(2)
上次關於ArrayList的結構沒有做總結。這次還是補充在自己部落格裡面吧。 ArrayList繼承自一個抽象類。實現了四個介面。 AbstractList繼承自AbstractCollection。AbstractCollection繼承自Object。 ArrayL
CentOS 7安裝Oracle 11gR2以及設定自啟動(2)
6、建立表空間和使用者授權 (1)、連線資料庫 $ sqlplus / as sysdba (2)、建立資料庫表空間 語法: create tablespace 表空間名 datafile ‘實體地址(相當於檔案路徑)’ size初始大小(單位M) autoextend on next每次
python原始碼分析----記憶體分配(2)
早就應該寫部分的內容了。。。。最近比較負能量。。。傷不起啊。。 上一篇說到了,在python的記憶體分配中兩個非常重要的方法:PyObject_Malloc和PyObject_Free 在具體的來這兩個方法之前,先要看看別的一些東西 //這裡用usedpool構成了一個雙
Selenium2Library原始碼解析與擴充套件(一)
一直覺得Selenium2Library對selenium的封裝很贊,最近模擬它的結構封裝給一個同事寫了個C# selenium的demo,過程中看了細看了一部分原始碼。加上之前封裝的內容,分享一波。 注1:以下涉及到RF的指令碼全未加延時sleep,如需除錯
Hystrix 原始碼解析 —— 請求執行(四)之失敗回退邏輯
本文主要基於 Hystrix 1.5.X 版本 1. 概述本文主要分享 Hystrix 命令執行(四)之失敗回退邏輯。 建議 :對 RxJava 已經有一定的瞭解的基礎上閱讀本文。 Hystrix 執行命令整體流程如下圖: 紅圈 :Hy
TiKV 原始碼解析系列文章(三)Prometheus(上)
開發十年,就只剩下這套架構體系了! >>>
Dubbo原始碼解析之SPI(一):擴充套件類的載入過程
Dubbo是一款開源的、高效能且輕量級的Java RPC框架,它提供了三大核心能力:面向介面的遠端方法呼叫、智慧容錯和負載均衡,以及服務自動註冊和發現。 Dubbo最早是阿里公司內部的RPC框架,於 2011 年開源,之後迅速成為國內該類開源專案的佼佼者,2018年2月,通過投票正式成為 Apache基金會孵
myBatis原始碼解析-日誌篇(1)
上半年在進行知識儲備,下半年爭取寫一點好的部落格來記錄自己原始碼之路。在學習原始碼的路上也掌握了一些設計模式,可所謂一舉兩得。本次打算寫Mybatis的原始碼解讀。 準備工作 1. 下載mybatis原始碼 下載地址:https://github.com/mybatis/mybatis-3 2.
myBatis原始碼解析-資料來源篇(3)
前言:我們使用mybatis時,關於資料來源的配置多使用如c3p0,druid等第三方的資料來源。其實mybatis內建了資料來源的實現,提供了連線資料庫,池的功能。在分析了快取和日誌包的原始碼後,接下來分析mybatis中的資料來源實現。 類圖:mybatis中關於資料來源的原始碼包路徑如下: