1. 程式人生 > >Codis原始碼解析——proxy新增到叢集

Codis原始碼解析——proxy新增到叢集

前面我們說過,proxy啟動之後,會預設處於 waiting 狀態,以一秒一次的頻率重新整理狀態,監聽proxy_addr 地址(預設配置檔案中的19000埠),但是不會 accept 連線,通過fe或者命令列新增到叢集並完成叢集狀態的同步,才能改變狀態為online。那麼,將proxy新增到叢集的過程中發生了什麼?這一篇我們就來看看。

通過介面新增比較簡單,直接輸入proxy的地址即可

這裡寫圖片描述

主要呼叫的方法在/pkg/topom/topom_proxy.go中

//傳入的addr就是proxy的地址
func (s *Topom) CreateProxy(addr string) error {
    s.mu.Lock()
    defer
s.mu.Unlock() ctx, err := s.newContext() if err != nil { return err } //這裡的p就是根據proxy地址取出models.proxy,也就是/codis3/codis-wujiang/proxy路徑下面的那個proxy-token中的詳細資訊 p, err := proxy.NewApiClient(addr).Model() if err != nil { return errors.Errorf("[email protected]%s fetch model failed, %s"
, addr, err) } //這個ApiClient中儲存了proxy的地址,以及根據productName,productAuth(預設為空)以及token生成的auth c := s.newProxyClient(p) //之前我們說過,proxy啟動的時候,在s.setup(config)這一步,會生成一個xauth,儲存在Proxy的xauth屬性中,這一步就是講上面得到的xauth和啟動proxy時的xauth作比較,來唯一確定需要的xauth if err := c.XPing(); err != nil { return errors.Errorf("
[email protected]
%s check xauth failed, %s"
, addr, err) } //檢查上下文中的proxy是否已經有token,如果有的話,說明這個proxy已經新增到叢集了 if ctx.proxy[p.Token] != nil { return errors.Errorf("proxy-[%s] already exists", p.Token) } else { //上下文中所有proxy的最大id+1,賦給當前的proxy作為其id p.Id = ctx.maxProxyId() + 1 } defer s.dirtyProxyCache(p.Token) //到這一步,proxy已經新增成功,更新"/codis3/codis-wujiang/proxy/proxy-token"下面的proxy資訊 if err := s.storeCreateProxy(p); err != nil { return err } else { return s.reinitProxy(ctx, p, c) } }

下面我們著重看一下reinitProxy方法,這個方法裡面主要是包含三個模組,都是ApiClient的方法。前面我們已經說過,ApiClient中儲存了proxy的地址和auth

func (s *Topom) reinitProxy(ctx *context, p *models.Proxy, c *proxy.ApiClient) error {
    log.Warnf("proxy-[%s] reinit:\n%s", p.Token, p.Encode())
    //初始化1024個槽
    if err := c.FillSlots(ctx.toSlotSlice(ctx.slots, p)...); err != nil {
        log.ErrorErrorf(err, "proxy-[%s] fillslots failed", p.Token)
        return errors.Errorf("proxy-[%s] fillslots failed", p.Token)
    }
    if err := c.Start(); err != nil {
        log.ErrorErrorf(err, "proxy-[%s] start failed", p.Token)
        return errors.Errorf("proxy-[%s] start failed", p.Token)
    }
    //由於此時sentinels還沒有,傳入的server佇列為空,所以這個方法我們暫時可以不管
    if err := c.SetSentinels(ctx.sentinel); err != nil {
        log.ErrorErrorf(err, "proxy-[%s] set sentinels failed", p.Token)
        return errors.Errorf("proxy-[%s] set sentinels failed", p.Token)
    }
    return nil
}

ctx.toSlotSlice主要就是根據models.SlotMapping來建立1024個models.Slot

func (ctx *context) toSlot(m *models.SlotMapping, p *models.Proxy) *models.Slot {
    slot := &models.Slot{
        Id:     m.Id,
        Locked: ctx.isSlotLocked(m),

        ForwardMethod: ctx.method,
    }
    switch m.Action.State {
    case models.ActionNothing, models.ActionPending:
        slot.BackendAddr = ctx.getGroupMaster(m.GroupId)
        slot.BackendAddrGroupId = m.GroupId
        slot.ReplicaGroups = ctx.toReplicaGroups(m.GroupId, p)
    case models.ActionPreparing:
        slot.BackendAddr = ctx.getGroupMaster(m.GroupId)
        slot.BackendAddrGroupId = m.GroupId
    case models.ActionPrepared:
        fallthrough
    case models.ActionMigrating:
        slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
        slot.BackendAddrGroupId = m.Action.TargetId
        slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
        slot.MigrateFromGroupId = m.GroupId
    case models.ActionFinished:
        slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
        slot.BackendAddrGroupId = m.Action.TargetId
    default:
        log.Panicf("slot-[%d] action state is invalid:\n%s", m.Id, m.Encode())
    }
    return slot
}

Topom.FillSlots階段,根據之前的1024個models.Slot,建立1024個/pkg/proxy/slots.go中的Slot,並且建立了SharedBackendConn。之前在Codis原始碼解析——proxy監聽redis請求中我們提到過,redis請求是由sharedBackendConn中取出的一個BackendConn進行處理的。而sharedBackendConn就是在填充槽的過程中建立的,如下列程式碼所示。這個方法由Proxy.Router呼叫,第一個引數是1024個models.slot中的每一個,第二個引數是寫死的false,第三個是FowardSemiAsync(這個是由配置檔案dashboard.toml中的migration_method指定的)。

之前我們說過,Proxy.Router中儲存了叢集中所有sharedBackendConnPool和slot,用於將redis請求轉發給相應的slot進行處理,而Router裡面的sharedBackendConnPool和slot就是在這裡進行填充的

func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) {
    slot := &s.slots[m.Id]
    slot.blockAndWait()

    //清空models.Slot裡面的backendConn
    slot.backend.bc.Release()
    slot.backend.bc = nil
    slot.backend.id = 0
    slot.migrate.bc.Release()
    slot.migrate.bc = nil
    slot.migrate.id = 0
    for i := range slot.replicaGroups {
        for _, bc := range slot.replicaGroups[i] {
            bc.Release()
        }
    }
    slot.replicaGroups = nil

    //false
    slot.switched = switched

    //初始階段addr和from都是空字串
    if addr := m.BackendAddr; len(addr) != 0 {
        //從Router的primary sharedBackendConnPool中取出addr對應的sharedBackendConn,如果沒有就新建並放入,也相當於初始化了
        slot.backend.bc = s.pool.primary.Retain(addr)
        slot.backend.id = m.BackendAddrGroupId
    }
    if from := m.MigrateFrom; len(from) != 0 {
        slot.migrate.bc = s.pool.primary.Retain(from)
        slot.migrate.id = m.MigrateFromGroupId
    }
    if !s.config.BackendPrimaryOnly {
        for i := range m.ReplicaGroups {
            var group []*sharedBackendConn
            for _, addr := range m.ReplicaGroups[i] {
                group = append(group, s.pool.replica.Retain(addr))
            }
            if len(group) == 0 {
                continue
            }
            slot.replicaGroups = append(slot.replicaGroups, group)
        }
    }
    if method != nil {
        slot.method = method
    }

    if !m.Locked {
        slot.unblock()
    }
    if !s.closed {
        if slot.migrate.bc != nil {
            if switched {
                log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t, +switched",
                    slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold)
            } else {
                log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t",
                    slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold)
            }
        } else {
            if switched {
                log.Warnf("fill slot %04d, backend.addr = %s, locked = %t, +switched",
                    slot.id, slot.backend.bc.Addr(), slot.lock.hold)
            } else {
                log.Warnf("fill slot %04d, backend.addr = %s, locked = %t",
                    slot.id, slot.backend.bc.Addr(), slot.lock.hold)
            }
        }
    }
}

這個階段主要是初始化槽。以id為0的槽為例,初始化之後,其結構如下圖所示,每個slot都被分配了相應的backendConn,只不過此時每個backendConn都為空

這裡寫圖片描述

這裡寫圖片描述

接下來主要介紹上面的Topom.start方法,主要就是將Proxy自身,和它的router和jodis設為上線,在zk中建立臨時節點/jodis/codis-productName/proxy-token,並監聽該節點的變化

func (s *Proxy) Start() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.closed {
        return ErrClosedProxy
    }
    if s.online {
        return nil
    }
    s.online = true
    //router的online屬性設為true
    s.router.Start()
    if s.jodis != nil {
        s.jodis.Start()
    }
    return nil
}

func (j *Jodis) Start() {
    j.mu.Lock()
    defer j.mu.Unlock()
    if j.online {
        return
    }
    //也是這個套路,先把online屬性設為true
    j.online = true

    go func() {
        var delay = &DelayExp2{
            Min: 1, Max: 30,
            Unit: time.Second,
        }
        for !j.IsClosed() {
            //這一步在zk中建立臨時節點/jodis/codis-wujiang/proxy-token,並新增監聽事件,監聽該節點中內容的改變。最終返回的w是一個chan struct{}。具體實現方法在下面的watch中
            w, err := j.Rewatch()
            if err != nil {
                log.WarnErrorf(err, "jodis watch node %s failed", j.path)
                delay.SleepWithCancel(j.IsClosed)
            } else {
                //從w中讀出zk下的變化
                <-w
                delay.Reset()
            }
        }
    }()
}
func (c *Client) watch(conn *zk.Conn, path string) (<-chan struct{}, error) {
    //GetW的核心方法就是下面的addWatcher,w就是addWatcher返回的ch
    _, _, w, err := conn.GetW(path)
    if err != nil {
        return nil, errors.Trace(err)
    }
    signal := make(chan struct{})
    go func() {
        defer close(signal)
        <-w
        log.Debugf("zkclient watch node %s update", path)
    }()
    return signal, nil
}
//這個類在/github.com/samuel/go-zookeeper/zk/conn.go中,返回一個channel。當關注的路徑下發生變更時,這個channel中就會讀出值
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
    c.watchersLock.Lock()
    defer c.watchersLock.Unlock()

    ch := make(chan Event, 1)
    wpt := watchPathType{path, watchType}
    c.watchers[wpt] = append(c.watchers[wpt], ch)
    return ch
}

上面的Proxy中的jodis如下圖所示

這裡寫圖片描述

到這裡,proxy已經成功新增到叢集中。可以從三個方面看出來:
1 介面中顯示成功

這裡寫圖片描述

這裡多說一句,介面上顯示的total session,是歷史記錄總數,alive session才有意義。每次新建一個session,這兩個資料都會加一,但是如果alive session加一之後超過了proxy.toml中設定的proxy_max_clients(預設為1000),就不會建立連線,並會把alive session減一。alive session減一的另外兩種情況是,成功返回請求結果後,以及session過期後(預設為75秒)

2 zk中增加了/jodis/codis-wujiang/proxy-token路徑

這裡寫圖片描述

3 proxy的日誌從waiting變為working,dashboard的日誌列印建立了proxy-token

這裡寫圖片描述

這裡寫圖片描述

別忘了,在dashboard啟動的時候,啟動了goroutine來重新整理proxy的狀態,下面我們就來看看,當叢集中新增了proxy之後,是如何重新整理的

//上下文中儲存了當前叢集的slots、group、proxy、sentinels等資訊
type context struct {
    slots []*models.SlotMapping
    group map[int]*models.Group
    proxy map[string]*models.Proxy

    sentinel *models.Sentinel

    hosts struct {
        sync.Mutex
        m map[string]net.IP
    }
    method int
}
func (s *Topom) RefreshProxyStats(timeout time.Duration) (*sync2.Future, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    //在這個建構函式中將判斷cache中的資料是否為空,為空的話就通過store從zk中取出填進cache
    ctx, err := s.newContext() 
    if err != nil {
        return nil, err
    }
    var fut sync2.Future
    //由於我們剛才添加了proxy,這裡ctx.proxy已經不為空了
    for _, p := range ctx.proxy {
        //fut中的waitsGroup加1
        fut.Add()
        go func(p *models.Proxy) {
            stats := s.newProxyStats(p, timeout)
            stats.UnixTime = time.Now().Unix()
            //在fut的vmap屬性中,新增以proxy.Token為鍵,ProxyStats為值的map,並將waitsGroup減1
            fut.Done(p.Token, stats)

            switch x := stats.Stats; {
            case x == nil:
            case x.Closed || x.Online:
            //如果一個proxy因為某種情況出現error,被運維重啟之後,處於waiting狀態,會呼叫OnlineProxy方法將proxy重新新增到叢集中
            default:
                if err := s.OnlineProxy(p.AdminAddr); err != nil {
                    log.WarnErrorf(err, "auto online proxy-[%s] failed", p.Token)
                }
            }
        }(p)
    }
    ////當所有proxy.Token和ProxyStats的關係map建立好之後,存到Topom.stats.proxies中
    go func() {
        stats := make(map[string]*ProxyStats)

        for k, v := range fut.Wait() {
            stats[k] = v.(*ProxyStats)
        }
        s.mu.Lock()
        defer s.mu.Unlock()
        //Topom的stats結構中的proxies屬性,儲存了完整的stats資訊,回想我們之前介紹的,Topom儲存著叢集中的所有配置和節點資訊
        s.stats.proxies = stats
    }()
    return &fut, nil
}

以上步驟執行完之後,我們來看看返回值fut

這裡寫圖片描述

手動kill一個proxy的程序,發現叢集上顯示該proxy為紅色error,但是其實這個proxy並沒有被踢出叢集ctx。可能有人會問,如果一個proxy掛了,codis是怎麼知道不要把請求發到這個proxy的呢?其實,當下面幾種情況,都會呼叫proxy.close方法,這個裡面呼叫了Jodis的close方法,將zk上面的該proxy資訊刪除。jodis客戶端是通過zk轉發的,自然就不會把請求發到這個proxy上了

go func() {
        defer s.Close()
        c := make(chan os.Signal, 1)
        signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)

        sig := <-c
        log.Warnf("[%p] proxy receive signal = '%v'", s, sig)
}()
func (s *Proxy) serveAdmin() {
    if s.IsClosed() {
        return
    }
    defer s.Close()

    log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr())

    eh := make(chan error, 1)
    go func(l net.Listener) {
        h := http.NewServeMux()
        h.Handle("/", newApiServer(s))
        hs := &http.Server{Handler: h}
        eh <- hs.Serve(l)
    }(s.ladmin)

    select {
    case <-s.exit.C:
        log.Warnf("[%p] admin shutdown", s)
    case err := <-eh:
        log.ErrorErrorf(err, "[%p] admin exit on error", s)
    }
}

func (s *Proxy) serveProxy() {
    if s.IsClosed() {
        return
    }
    defer s.Close()

    log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr())

    eh := make(chan error, 1)
    go func(l net.Listener) (err error) {
        defer func() {
            eh <- err
        }()
        for {
            c, err := s.acceptConn(l)
            if err != nil {
                return err
            }
            NewSession(c, s.config).Start(s.router)
        }
    }(s.lproxy)

    if d := s.config.BackendPingPeriod.Duration(); d != 0 {
        go s.keepAlive(d)
    }

    select {
    case <-s.exit.C:
        log.Warnf("[%p] proxy shutdown", s)
    case err := <-eh:
        log.ErrorErrorf(err, "[%p] proxy exit on error", s)
    }
}

總結一下,proxy啟動之後,一直處於waiting的狀態,直到將proxy新增到叢集。首先對要新增的proxy做引數校驗,根據proxy addr獲取一個ApiClient,更新zk中有關於這個proxy的資訊。接下來,從上下文中獲取ctx.slots,也就是[]*models.SlotMapping,建立1024個models.Slot,再填充1024個pkg/proxy/slots.go中的Slot,此過程中Router為每個Slot都分配了對應的backendConn。

下一步,將Proxy自身,和它的router和jodis設為上線,在zk中建立臨時節點/jodis/codis-productName/proxy-token,並監聽該節點的變化,如果有變化從相應的channel中讀出值。啟動dashboard的時候,有一個goroutine專門負責重新整理proxy狀態,將每一個proxy.Token和ProxyStats的關係map對應起來,存入Topom.stats.proxies中。

相關推薦

Codis原始碼解析——proxy新增叢集

前面我們說過,proxy啟動之後,會預設處於 waiting 狀態,以一秒一次的頻率重新整理狀態,監聽proxy_addr 地址(預設配置檔案中的19000埠),但是不會 accept 連線,通過fe或者命令列新增到叢集並完成叢集狀態的同步,才能改變狀態為onl

Codis原始碼解析——proxy的啟動

proxy啟動的時候,首先檢查輸入的命令列,一般情況下,啟動proxy的命令如下: nohup ./bin/codis-proxy --ncpu=2 --config=./conf/proxy.conf --log=./logs/proxy.log --l

Redis原始碼解析:28叢集(四)手動故障轉移、從節點遷移

一:手動故障轉移          Redis叢集支援手動故障轉移。也就是向從節點發送”CLUSTER  FAILOVER”命令,使其在主節點未下線的情況下,發起故障轉移流程,升級為新的主節點,而原來的主節點降級為從節點。          為了不丟失資料,向從節點發送”C

Redis原始碼解析:27叢集(三)主從複製、故障轉移

一:主從複製          在叢集中,為了保證叢集的健壯性,通常設定一部分叢集節點為主節點,另一部分叢集節點為這些主節點的從節點。一般情況下,需要保證每個主節點至少有一個從節點。          叢集初始化時,每個叢集節點都是以獨立的主節點角色而存在的,通過向叢集節點

Codis原始碼解析——處理slot操作(1)

上一篇我們講了slot在叢集中的分配方式,重點講了auto-rebalance的原理。之前我們說過,再啟動dashboard的時候,有一個goroutine專門用來處理slot的操作。這一篇我們就來看看slot的操作是如何進行的。我們這裡舉例也是用叢集中有兩個g

Redis原始碼解析:25叢集(一)握手、心跳訊息以及下線檢測

         Redis叢集是Redis提供的分散式資料庫方案,通過分片來進行資料共享,並提供複製和故障轉移功能。 一:初始化 1:資料結構 在原始碼中,通過server.cluster記錄整個叢集當前的狀態,比如叢集中的所有節點;叢集目前的狀態,比如是上線還是下線;

Codis原始碼解析——處理slot操作(2)

這一篇我們把處理slot操作的機制講完。還剩最後兩個部分。一個是fillSlot,一個是每一個槽具體的處理方式。 本例中有兩個group,將之前auto-rebalance過的slot(0-511屬於group1,512-1023屬於group2) 現在

註冊中心 Eureka 原始碼解析 —— Eureka-Server 叢集同步

點選上方“芋道原始碼”,選擇“置頂公眾號”技術文章第一時間送達!原始碼精品專欄 摘要: 原創出處

Codis原始碼解析——dashboard的啟動(2)

1 重新整理redis狀態 首先認識兩個重要的struct type Future struct { sync.Mutex wait sync.WaitGroup vmap map[string]interface{} } typ

caffe原始碼解析新增新的Layer(maxout)

本文分為兩部分,先寫一個入門的教程,然後再給出自己新增maxout與NIN的layer的方法 (一) Here's roughly the process I follow. Add a class declaration for your

Codis原始碼解析——Jodis

我們在java專案裡面連線已經搭建好的Codis叢集時,需要用到其java客戶端——Jodis。這一篇我們就來看看Jodis是如何操作對Codis叢集進行操作的。 import io.codis.jodis.JedisResourcePool; impor

Codis原始碼解析——sentinel的重同步(2)

Topom.ha.monitor本身相當於一個上帝視角的sentinel。它本身並不是一個實際的sentinel伺服器,但是它負責收集各個sentinel的監控資訊,並對叢集作出反饋。這一講我們就來看看Topom.ha.monitor。這一篇的原始碼也有助於大家

Codis原始碼解析——dashboard的啟動(1)

dashboard是codis的叢集管理工具,支援proxy和server的新增、刪除、資料遷移,所有對叢集的操作必須通過dashboard。dashboard的啟動過程和proxy類似。dashboard的啟動只是初始化一些必要的資料結構,複雜的在於對叢集的操

CODIS原始碼剖析二(codis-proxy功能實現)

一、codis-proxy啟動http服務接受叢集命令 func newApiServer(p *Proxy) http.Handler { m := martini.New() //go-martini web開發框架

elasticsearch叢集選舉原始碼解析

elasticsearch的節點Node在啟動的時候(也就是在start()方法中)開始加入叢集,並準備參與選舉。 在Node的start()方法中,會呼叫ZenDiscovery的startInitialJoin()方法開始加入叢集並準備進行參與選舉。 @Overr

HashMap原始碼解析jdk1.8:初始化resize,新增put,獲取get

原始碼解析有參考以下部落格: http://www.cnblogs.com/jzb-blog/p/6637823.html HashMap:   以k-v鍵值對儲存格式的容器,key,value都可以為空,key不重複,非執行緒安全(執行緒安全請使用Concur

dubbo原始碼解析五 --- 叢集容錯架構設計與原理分析

歡迎來我的 Star Followers 後期後繼續更新Dubbo別的文章 下面是個人部落格地址,頁面比部落格園美觀一些其他都是一樣的 目錄 面試中叢集容錯的經常的問題 Dubbo 官方文件關於叢集容錯的介紹 Dubbo叢集容錯的架構分析 Dubbo叢集容錯原始碼解析 面試中叢集容錯的經

Netty進階:Futrue&Promise原始碼解析

文章目錄 1. Future&Promise 2. AbstractFuture 3.Completefuture 4.Channelfuture&Completechannel

大資料基礎(1)zookeeper原始碼解析

五 原始碼解析   public enum ServerState { LOOKING, FOLLOWING, LEADING, OBSERVING;}zookeeper伺服器狀態:剛啟動LOOKING,follower是FOLLOWING,leader是LEADING,observer是

Android框架原始碼解析之(四)Picasso

這次要分析的原始碼是 Picasso 2.5.2 ,四年前的版本,用eclipse寫的,但不影響這次我們對其原始碼的分析 地址:https://github.com/square/picasso/tree/picasso-parent-2.5.2 Picasso的簡單使用