1. 程式人生 > >Codis原始碼解析——處理slot操作(2)

Codis原始碼解析——處理slot操作(2)

這一篇我們把處理slot操作的機制講完。還剩最後兩個部分。一個是fillSlot,一個是每一個槽具體的處理方式。

本例中有兩個group,將之前auto-rebalance過的slot(0-511屬於group1,512-1023屬於group2)

這裡寫圖片描述

現在將slot 400到500移動到group2。我們之前走到了

s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)

首先new一個ApiClient,裡面儲存了proxy的地址,以及xauth資訊(xauth是根據ProductName,ProductAuth以及proxy的token生成的)。下面呼叫Proxy的FillSlots方法,填充slot

//這裡傳過來的slots實際上只有一個,就是整個叢集中最小的Action.State對應的Slotmapping
func (s *Proxy) FillSlots(slots []*models.Slot) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.closed {
        return ErrClosedProxy
    }
    for _, m := range slots {
        if err := s.router.FillSlot(m); err != nil {
            return
err } } return nil }

func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) {
    slot := &s.slots[m.Id]
    //將slot的lock.hold屬性設為true,並加鎖
    slot.blockAndWait()

    //這裡的bc就是我們之前提到過的處理redis請求的sharedBackendConn。第一次進來的時候,backend為空,這裡直接返回
    slot.backend
.bc.Release() slot.backend.bc = nil slot.backend.id = 0 slot.migrate.bc.Release() slot.migrate.bc = nil slot.migrate.id = 0 for i := range slot.replicaGroups { for _, bc := range slot.replicaGroups[i] { bc.Release() } } slot.replicaGroups = nil slot.switched = switched //這裡的addr就是我們的redis伺服器地址,10.0.2.15:6379 if addr := m.BackendAddr; len(addr) != 0 { //根據地址獲得sharedBackendConn slot.backend.bc = s.pool.primary.Retain(addr) slot.backend.id = m.BackendAddrGroupId } if from := m.MigrateFrom; len(from) != 0 { slot.migrate.bc = s.pool.primary.Retain(from) slot.migrate.id = m.MigrateFromGroupId } if !s.config.BackendPrimaryOnly { for i := range m.ReplicaGroups { var group []*sharedBackendConn for _, addr := range m.ReplicaGroups[i] { group = append(group, s.pool.replica.Retain(addr)) } if len(group) == 0 { continue } slot.replicaGroups = append(slot.replicaGroups, group) } } if method != nil { slot.method = method } if !m.Locked { slot.unblock() } if !s.closed { if slot.migrate.bc != nil { if switched { log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t, +switched", slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold) } else { log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t", slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold) } } else { if switched { log.Warnf("fill slot %04d, backend.addr = %s, locked = %t, +switched", slot.id, slot.backend.bc.Addr(), slot.lock.hold) } else { log.Warnf("fill slot %04d, backend.addr = %s, locked = %t", slot.id, slot.backend.bc.Addr(), slot.lock.hold) } } } }

上面的根據addr取得sharedBackendConnPool,是先從Router.pool.pimary這個sharedBackendConnPool中取,如果能取出就直接取,快取中為空的話就新建

func (p *sharedBackendConnPool) Retain(addr string) *sharedBackendConn {
    if bc := p.pool[addr]; bc != nil {
        return bc.Retain()
    } else {
        bc = newSharedBackendConn(addr, p)
        p.pool[addr] = bc
        return bc
    }
}
//更新每個sharedBackendConn的引用次數
func (s *sharedBackendConn) Retain() *sharedBackendConn {
    if s == nil {
        return nil
    }

    if s.refcnt <= 0 {
        log.Panicf("shared backend conn has been closed")
    } else {
        s.refcnt++
    }
    return s
}

func (s *haredBackendConn) Release() {
    if s == nil {
        return
    }
    if s.refcnt <= 0 {
        log.Panicf("shared backend conn has been closed, close too many times")
    } else {
        s.refcnt--
    }
    if s.refcnt != 0 {
        return
    }
    for _, parallel := range s.conns {
        for _, bc := range parallel {
            bc.Close()
        }
    }
    delete(s.owner.pool, s.addr)
}

之前我們說過,具體的redis請求都是通過sharedBackendConn來處理的,而新建sharedBackendConn就在這個地方。根據addr(也就是”10.0.2.15:6379”)獲取sharedBackendConn這個過程比較複雜,詳見Codis原始碼解析——sharedBackendConn

經過上面的過程之後,slot已經被填充了。slot的conns屬性就是與redis16個庫的連線

這裡寫圖片描述

下一步,再回到之前的真正對slot做操作的一步。即上一篇中的,取出plans,對每個slot進行處理。以id為510的slot為例,這個slot要從group1被遷移到group2

for sid, _ := range plans {
     fut.Add()
     go func(sid int) {
         log.Warnf("slot-[%d] process action", sid)
         //針對每個slot做處理
         var err = s.processSlotAction(sid)
         if err != nil {
             status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
             s.action.progress.status.Store(status)
         } else {
             s.action.progress.status.Store("")
         }
         //在Future的vmap中儲存slotId和對應的error,並呼叫WaitGroup.Done
         fut.Done(strconv.Itoa(sid), err)
     }(sid)
}
func (s *Topom) processSlotAction(sid int) error {
    var db int = 0
    for s.IsOnline() {
        //返回的exec就是具體的slot操作執行函式
        if exec, err := s.newSlotActionExecutor(sid); err != nil {
            return err
        } else if exec == nil {
            time.Sleep(time.Second)
        } else {
            n, nextdb, err := exec(db)
            if err != nil {
                return err
            }
            log.Debugf("slot-[%d] action executor %d", sid, n)

            if n == 0 && nextdb == -1 {
                return s.SlotActionComplete(sid)
            }
            status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n)
            s.action.progress.status.Store(status)

            if us := s.GetSlotActionInterval(); us != 0 {
                time.Sleep(time.Microsecond * time.Duration(us))
            }
            db = nextdb
        }
    }
    return nil
}
func (s *Topom) newSlotActionExecutor(sid int) (func(db int) (remains int, nextdb int, err error), error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    ctx, err := s.newContext()
    if err != nil {
        return nil, err
    }

    //根據slot的id獲取SlotMapping,主要方法就是return ctx.slots[sid], nil
    m, err := ctx.getSlotMapping(sid)
    if err != nil {
        return nil, err
    }

    switch m.Action.State {

    //最初slot還處在遷移過程中,即migrating
    case models.ActionMigrating:

        if s.action.disabled.IsTrue() {
            return nil, nil
        }
        //檢視m所在的group,如果存在group,而且其Promoting.State不為空字串,就返回true;否則返回false
        if ctx.isGroupPromoting(m.GroupId) {
            return nil, nil
        }
        if ctx.isGroupPromoting(m.Action.TargetId) {
            return nil, nil
        }

        //遷移過程中,一個slot本身所在的group以及目標group的Promoting.State都必須為空才可以做遷移
        from := ctx.getGroupMaster(m.GroupId)
        //取出group2的第一個server,也就是master,dest是"10.0.2.15:6380"
        dest := ctx.getGroupMaster(m.Action.TargetId)

        //Topom的action中的計數器加一
        s.action.executor.Incr()

        return func(db int) (int, int, error) {
            //每執行一個槽的遷移操作,Topom的action中的計數器就減1
            defer s.action.executor.Decr()
            if from == "" {
                return 0, -1, nil
            }
            //從cache中得到group1的redisClient,這個client由addr(10.0.2.15:6379), auth, timeout,Database,redigo.Conn組成;如果cache沒有,就新建。詳見文末的Client
            c, err := s.action.redisp.GetClient(from)
            if err != nil {
                return 0, -1, err
            }
            //將剛才新建的或者從cache中取出的redis client再put到Topom.action.redisp中
            defer s.action.redisp.PutClient(c)

            //這裡db是0,相當於redis從16個庫中選擇0號
            if err := c.Select(db); err != nil {
                return 0, -1, err
            }
            var do func() (int, error)

            method, _ := models.ParseForwardMethod(s.config.MigrationMethod)
            switch method {
            case models.ForwardSync:
                do = func() (int, error) {
                    //呼叫redis的SLOTSMGRTTAGSLOT命令,隨機選擇當前slot的一個key,並將與這個key的tag相同的k-v全部遷移到目標機
                    return c.MigrateSlot(sid, dest)
                }
            case models.ForwardSemiAsync:
                var option = &redis.MigrateSlotAsyncOption{
                    MaxBulks: s.config.MigrationAsyncMaxBulks,
                    MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
                    NumKeys:  s.config.MigrationAsyncNumKeys,
                    Timeout: math2.MinDuration(time.Second*5,
                        s.config.MigrationTimeout.Duration()),
                }
                do = func() (int, error) {
                    //呼叫redis的SLOTSMGRTTAGSLOT-ASYNC命令,引數是target redis的ip和port,也就是10.0.2.15和6380
                    return c.MigrateSlotAsync(sid, dest, option)
                }
            default:
                log.Panicf("unknown forward method %d", int(method))
            }

            n, err := do()
            if err != nil {
                return 0, -1, err
            } else if n != 0 {
                return n, db, nil
            }

            nextdb := -1
            //通過info命令查keyspace資訊並做處理,這裡取出的m為空
            m, err := c.InfoKeySpace()
            if err != nil {
                return 0, -1, err
            }
            for i := range m {
                if (nextdb == -1 || i < nextdb) && db < i {
                    nextdb = i
                }
            }
            //返回的nextdb是-1
            return 0, nextdb, nil
        }, nil

    case models.ActionFinished:

        return func(int) (int, int, error) {
            return 0, -1, nil
        }, nil

    default:

        return nil, errors.Errorf("slot-[%d] action state is invalid", m.Id)

    }
}
func (s *Topom) SlotActionComplete(sid int) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    ctx, err := s.newContext()
    if err != nil {
        return err
    }

    m, err := ctx.getSlotMapping(sid)
    if err != nil {
        return err
    }

    log.Warnf("slot-[%d] action complete:\n%s", m.Id, m.Encode())

    switch m.Action.State {

    //首先狀態是migrating
    case models.ActionMigrating:

        defer s.dirtySlotsCache(m.Id)

        //推進到finished
        m.Action.State = models.ActionFinished
        if err := s.storeUpdateSlotMapping(m); err != nil {
            return err
        }

        fallthrough

    case models.ActionFinished:

        log.Warnf("slot-[%d] resync to finished", m.Id)

        //這個方法在上一節介紹過,本節就不再贅述了
        if err := s.resyncSlotMappings(ctx, m); err != nil {
            log.Warnf("slot-[%d] resync to finished failed", m.Id)
            return err
        }
        defer s.dirtySlotsCache(m.Id)

        m = &models.SlotMapping{
            Id:      m.Id,
            GroupId: m.Action.TargetId,
        }
        return s.storeUpdateSlotMapping(m)

    default:

        return errors.Errorf("slot-[%d] action state is invalid", m.Id)

    }
}

在Codis中,對於redis的操作都是通過Client中的redigo.Conn(一個第三方redis客戶端)發命令的。如下所示

type Client struct {
    conn redigo.Conn
    Addr string
    Auth string

    Database int

    LastUse time.Time
    Timeout time.Duration
}
func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
    r, err := c.conn.Do(cmd, args...)
    if err != nil {
        return nil, errors.Trace(err)
    }
    c.LastUse = time.Now()

    if err, ok := r.(redigo.Error); ok {
        return nil, errors.Trace(err)
    }
    return r, nil
}

到這裡,slot的操作就結束了。在頁面中可以看到slot已經遷移成功

這裡寫圖片描述

從dashboard的日誌中,我們也能瞭解到,首先是將所有slot的action.state設為pending,然後再逐個遷移每個slot。下面以id為1022的slot遷移到group1舉例

//之前已經先把所有slot的action.state設為pending
.
.
.
2017/10/26 10:04:45 topom_slots.go:240: [WARN] slot-[1022] action prepare:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "pending",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "preparing",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_slots.go:259: [WARN] slot-[1022] resync to prepared
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "prepared",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_slots.go:279: [WARN] slot-[1022] resync to migrating
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "migrating",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_action.go:56: [WARN] slot-[1022] process action
2017/10/26 10:04:45 topom_slots.go:320: [WARN] slot-[1022] action complete:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "migrating",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
    "id": 1022,
    "group_id": 0,
    "action": {
        "index": 1023,
        "state": "finished",
        "target_id": 1
    }
}
2017/10/26 10:04:45 topom_slots.go:337: [WARN] slot-[1022] resync to finished
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
    "id": 1022,
    "group_id": 1,
    "action": {}
}

總結一下,slot的遷移工作是很複雜的。對叢集中的slot手動進行rebalance之後,每個slot都被指定了相應的遷移計劃。對叢集中的slot做SlotActionPrepareFilter處理,先找Action.State既不為空也不是pending的SlotMapping中Action.Index最小的SlotMapping,找不到的話就去找Action.State為pending的SlotMapping中Action.Index最小的SlotMapping,找到之後逐個變更每個SlotMapping的action.state,在zk上更新,Action.state符合preparing或者prepared的時候,要呼叫Topom的resyncSlotMappings方法,根據SlotMapping的引數同步到models.Slot(ctx.toSlotSlice方法),再同步到Proxy.Slot(Router.fillslot方法),這個過程中,每個Slot都被分配了backendConn,這個backendConn是從Proxy.Router.pool中取出來的(沒有就新建,再放到pool中)。

上面的遷移準備工作完成之後,再逐個處理每一個slot的遷移。只有一個slot本身所在的group以及目標group的Promoting.State都為空時,才可以做遷移。槽的遷移是由Topom.action.redisp裡面的from client來進行的,分為sync和semi-sync。每一個slot遷移完成之後,再呼叫SlotActionComplete,推進slot的action.state,更新zk上的資訊,並呼叫resyncSlotMappings同步叢集中的slot資訊,例如釋放掉migrate.bc。

相關推薦

Codis原始碼解析——處理slot操作2

這一篇我們把處理slot操作的機制講完。還剩最後兩個部分。一個是fillSlot,一個是每一個槽具體的處理方式。 本例中有兩個group,將之前auto-rebalance過的slot(0-511屬於group1,512-1023屬於group2) 現在

Codis原始碼解析——處理slot操作1

上一篇我們講了slot在叢集中的分配方式,重點講了auto-rebalance的原理。之前我們說過,再啟動dashboard的時候,有一個goroutine專門用來處理slot的操作。這一篇我們就來看看slot的操作是如何進行的。我們這裡舉例也是用叢集中有兩個g

Codis原始碼解析——dashboard的啟動2

1 重新整理redis狀態 首先認識兩個重要的struct type Future struct { sync.Mutex wait sync.WaitGroup vmap map[string]interface{} } typ

SOFABoot原始碼解析之啟動原理2-原始碼解析

1.  private voidprocessImports(ConfigurationClass configClass, SourceClass currentSourceClass,2.           Collection<SourceClass>importCandidates, b

Codis原始碼解析——dashboard的啟動1

dashboard是codis的叢集管理工具,支援proxy和server的新增、刪除、資料遷移,所有對叢集的操作必須通過dashboard。dashboard的啟動過程和proxy類似。dashboard的啟動只是初始化一些必要的資料結構,複雜的在於對叢集的操

原始碼剖析】Launcher 8.0 原始碼 25---使用者操作2模式切換

 模式就是介面,除普通模式外,Launcher還有兩個特殊模式,分別是overView模式和Springloader模式。此處採用狀態模式這種設計模式,共有三個狀態。 overView模式是長按桌面空白處,出現特殊功能,比如設定桌布,新增widget,特殊設定(橫屏開關

原始碼分析篇--Java集合操作2

4、兩大集合介面 在Java集合中,有兩大集合,一個是Collection介面及其實現類,另一個是Map介面及其實現類。下面給出這兩種集合的框架圖。如下所示。 4.1Collection介面框架圖 4.2Map介面框架圖 從上面兩個框架圖可以看出,Cllection介面和Map介面是

基於Unity的FFT快速傅立葉變換的頻譜解析的研究_預處理音訊分析2

Unity中的演算法節拍對映:預處理音訊分析 如果您還沒有閱讀本系列中的上一篇文章“關於FFT的音訊解析的研究_實時取樣解析音訊(1)”,使用Unity API進行實時音訊分析,請在閱讀之前花些時間這樣做。 它涵蓋了執行預處理分析所需的許多核心概念。 在進行實時分析時,我

Hibernate的增刪改查操作2

rac result jlist static 面向 原生 comm public set 一、在Hibernate中使用原生SQL語句 sql語句面向的是數據庫,所以sql語句中對應的不再是bean了,比如sql="select * from user" 在hql中

HectorSLAM論文解析?代碼重寫2

開始 機器人 狀態 核心 基本上 測試數據 logs grid 機器 這篇文章為HectorSLAM系列的以下部分 HectorSLAM的整體邏輯 激光匹配 地圖構造 地圖更新 500行代碼重寫一個LidarSLAM 測試數據的準備,和測試數據讀取模塊的編寫

SpringMVC基本操作2

正常 esp 模型 over edi 錯誤 handle 之前 表現 1.)使用 POJO 對象綁定請求參數值 ? Spring MVC 會按請求參數名和 POJO 屬性名進行自動匹 配,自動為該對象填充屬性值。支持級聯屬性。 如:dept.deptId、dept.addr

27. Python對Mysql的操作2

python mysql1.遊標遊標是系統為用戶開設的一個數據緩沖區,存放SQL語句的執行結果用戶可以用SQL語句逐一從遊標中獲取記錄,並賦給主變量,交由python進一步處理,一組主變量一次只能存放一條記錄僅使用主變量並不能完全滿足SQL語句向應用程序輸出數據的要求遊標提供了一種對從表中檢索出的數據進行操作

MySQL數據庫操作2基本操作

大於 ase 存在 delete div .... desc 搜索 查看數據庫 創建數據庫:CREATE DATABASE [IF NOT EXISTS] 庫名例子:CREATE DATABASE `mydb`;CREATE DATABASE IF NOT EXISTS `

MySQL常用操作2MySQL用戶管理、常用sql語句、 MySQL數據庫備份恢復

MySQL用戶管理 MySQL用戶管理創建一個普通用戶並且授權1.grant all on *.* to 'user1' identified by 'passwd';grant all on *.* to 'user1' iden

matlab基本操作2

sin res 9.png 一個 nbsp 分享圖片 輸出 .com 返回 %求特征值和特征向量 x=0:0.01:50; A=[1 2 3 12;4 5 6 11;7 8 9 10;2 3 4 5]; B=[2 7;3 4]; eig(A); % ans = % %

EntityFramework Core筆記:表結構及數據操作2

IV totable prot table AS lec ext lib models 1. 表結構操作 1.1 表名   Data Annotations: using System.ComponentModel.DataAnnotations.Schema;

MATLAB編程與應用系列-第2章 數組及矩陣的創建及操作2

示例 例如 matrix 6.2 由於 變量 com 語法 2.4 本系列教程來源於出版設計《基於MATLAB編程基礎與典型應用書籍》,如涉及版權問題,請聯系:[email protected]。 出版社:人民郵電出版社, 頁數:525。 本系列教程目前基於MATLABR20

自然場景文字處理論文整理2STN-OCR

今天是進入公司實習的第三週了,在小組內負責的工作主要是和自然場景文字檢測相關的內容。這裡把看過的論文做一下翻譯和整理,也方便自己日後檢視。 Paper:STN-OCR: A single Neural Network for Text Detection and Text Recogn

Django ORM相關操作2

今天就講講關於雙下劃線的操作,這是第二篇關於orm相關操作的文章,還想看請往上翻第一篇。   # -*- coding: utf-8 -*- # @Time : 2018/11/15 19:26 # @Author : lh # @Email : .com # @F

Robot Framework - 入門與操作2

04- 建立測試庫--基礎概念 Robot Framework 實際的測試能力是由測試庫提供的。 ***** 支援的程式語言 Robot Framework 自身是用 Python 編寫的,能使用 Python 擴充套件測試庫。 如果在 Jython 執行Robot Frame