1. 程式人生 > >CODIS原始碼剖析二(codis-proxy功能實現)

CODIS原始碼剖析二(codis-proxy功能實現)

一、codis-proxy啟動http服務接受叢集命令

func newApiServer(p *Proxy) http.Handler {
    m := martini.New()          //go-martini web開發框架
    m.Use(martini.Recovery())
    m.Use(render.Renderer())
    m.Use(func(w http.ResponseWriter, req *http.Request, c martini.Context) {
        path := req.URL.Path
        if req.Method
!= "GET" && strings.HasPrefix(path, "/api/") { var remoteAddr = req.RemoteAddr var headerAddr string for _, key := range []string{"X-Real-IP", "X-Forwarded-For"} { if val := req.Header.Get(key); val != "" { headerAddr = val break
} } log.Warnf("[%p] API call %s from %s [%s]", p, path, remoteAddr, headerAddr) } c.Next() }) m.Use(gzip.All()) m.Use(func(c martini.Context, w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json; charset=utf-8"
) }) api := &apiServer{proxy: p} r := martini.NewRouter() r.Get("/", func(r render.Render) { r.Redirect("/proxy") }) r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) { http.DefaultServeMux.ServeHTTP(w, req) }) r.Group("/proxy", func(r martini.Router) { r.Get("", api.Overview) r.Get("/model", api.Model) r.Get("/stats", api.StatsNoXAuth) r.Get("/slots", api.SlotsNoXAuth) }) r.Group("/api/proxy", func(r martini.Router) { r.Get("/model", api.Model) r.Get("/xping/:xauth", api.XPing) r.Get("/stats/:xauth", api.Stats) r.Get("/stats/:xauth/:flags", api.Stats) r.Get("/slots/:xauth", api.Slots) r.Put("/start/:xauth", api.Start) r.Put("/stats/reset/:xauth", api.ResetStats) r.Put("/forcegc/:xauth", api.ForceGC) r.Put("/shutdown/:xauth", api.Shutdown) r.Put("/loglevel/:xauth/:value", api.LogLevel) r.Put("/fillslots/:xauth", binding.Json([]*models.Slot{}), api.FillSlots) r.Put("/sentinels/:xauth", binding.Json(models.Sentinel{}), api.SetSentinels) r.Put("/sentinels/:xauth/rewatch", api.RewatchSentinels) }) m.MapTo(r, (*martini.Routes)(nil)) m.Action(r.Handle) return m }
  • apiServer提供http服務,包括服務端和客戶端Api,當codis-proxy啟動之後啟動http服務等待叢集配置命令的請求,具體的的Api路徑在$GOPATH/src/github.com/CodisLabs/codis/pkg/proxy/proxy_api.go;

二、codis-proxy啟動redis協議服務處理redis命令

func NewSession(sock net.Conn, config *Config) *Session {
    c := redis.NewConn(sock,
        config.SessionRecvBufsize.AsInt(),
        config.SessionSendBufsize.AsInt(),
    )
    c.ReaderTimeout = config.SessionRecvTimeout.Duration()
    c.WriterTimeout = config.SessionSendTimeout.Duration()
    c.SetKeepAlivePeriod(config.SessionKeepAlivePeriod.Duration())

    s := &Session{
        Conn: c, config: config,              //會話的核心結果
        CreateUnix: time.Now().Unix(),
    }
    s.stats.opmap = make(map[string]*opStats, 16)
    log.Infof("session [%p] create: %s", s, s)
    return s
}

func (s *Session) Start(d *Router) {
    s.start.Do(func() {
        if int(incrSessions()) > s.config.ProxyMaxClients {
            go func() {
                s.Conn.Encode(redis.NewErrorf("ERR max number of clients reached"), true)
                s.CloseWithError(ErrTooManySessions)
            }()
            decrSessions()
            return
        }

        if !d.isOnline() {
            go func() {
                s.Conn.Encode(redis.NewErrorf("ERR router is not online"), true)
                s.CloseWithError(ErrRouterNotOnline)
            }()
            decrSessions()
            return
        }

        tasks := NewRequestChanBuffer(1024)

        go func() {
            s.loopWriter(tasks)      //等待處理結果併合並結果
            decrSessions()
        }()

        go func() {
            s.loopReader(tasks, d)  //指令讀取及分發(分發給backend)
            tasks.Close() 
        }()
    })
}
  • codis-proxy每接到一個redis請求都會建立一個獨立的session來處理,下面給出proxy的實現原理。
  • 其中Rc對應s.loopReader負責請求讀取及請求分發,Wc相當於s.loopWriter負責等待處理結果併合並結果返回給客戶端,下面我們會根據r *Request, d *Router為主線進一步分析。
func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
    defer func() {
        s.CloseReaderWithError(err)
    }()

    var (
        breakOnFailure = s.config.SessionBreakOnFailure
        maxPipelineLen = s.config.SessionMaxPipeline
    )

    for !s.quit {
        multi, err := s.Conn.DecodeMultiBulk()
        if err != nil {
            return err
        }
        s.incrOpTotal()

        if tasks.Buffered() > maxPipelineLen {
            return ErrTooManyPipelinedRequests
        }

        start := time.Now()
        s.LastOpUnix = start.Unix()
        s.Ops++

        r := &Request{}
        r.Multi = multi
        r.Batch = &sync.WaitGroup{}
        r.Database = s.database
        r.UnixNano = start.UnixNano()

        if err := s.handleRequest(r, d); err != nil {   //s.handleRequest是指令分發
            r.Resp = redis.NewErrorf("ERR handle request, %s", err)
            tasks.PushBack(r)
            if breakOnFailure {
                return err
            }
        } else {
            tasks.PushBack(r)
        }
    }
    return nil
}
  • loopReader將請求分發給backend處理完之後把帶有結果的r儲存到task中以供loopWriter後續使用。
func (s *Session) handleRequest(r *Request, d *Router) error {
    opstr, flag, err := getOpInfo(r.Multi)
    if err != nil {
        return err
    }
    r.OpStr = opstr
    r.OpFlag = flag
    r.Broken = &s.broken

    if flag.IsNotAllowed() {
        return fmt.Errorf("command '%s' is not allowed", opstr)
    }

    switch opstr {
    case "QUIT":
        return s.handleQuit(r)
    case "AUTH":
        return s.handleAuth(r)
    }

    if !s.authorized {
        if s.config.ProductAuth != "" {
            r.Resp = redis.NewErrorf("NOAUTH Authentication required")
            return nil
        }
        s.authorized = true
    }

    switch opstr {
    case "SELECT":
        return s.handleSelect(r)
    case "PING":
        return s.handleRequestPing(r, d)
    case "INFO":
        return s.handleRequestInfo(r, d)
    case "MGET":
        return s.handleRequestMGet(r, d)
    case "MSET":
        return s.handleRequestMSet(r, d)
    case "DEL":
        return s.handleRequestDel(r, d)
    case "EXISTS":
        return s.handleRequestExists(r, d)
    case "SLOTSINFO":
        return s.handleRequestSlotsInfo(r, d)
    case "SLOTSSCAN":
        return s.handleRequestSlotsScan(r, d)
    case "SLOTSMAPPING":
        return s.handleRequestSlotsMapping(r, d)
    default:
        return d.dispatch(r)
    }
}
  • 該方法比較好理解具體命令的處理,下面以MGET為例
func (s *Session) handleRequestMGet(r *Request, d *Router) error {
    var nkeys = len(r.Multi) - 1
    switch {
    case nkeys == 0:
        r.Resp = redis.NewErrorf("ERR wrong number of arguments for 'MGET' command")
        return nil
    case nkeys == 1:
        return d.dispatch(r)            //請求分發
    }
    var sub = r.MakeSubRequest(nkeys)   //將請求拆分成子請求
    for i := range sub {
        sub[i].Multi = []*redis.Resp{
            r.Multi[0],
            r.Multi[i+1],
        }
        if err := d.dispatch(&sub[i]); err != nil {
            return err
        }
    }
    r.Coalesce = func() error {         //繫結結果合併方法,loopWriter中會使用
        var array = make([]*redis.Resp, len(sub))
        for i := range sub {
            if err := sub[i].Err; err != nil {
                return err
            }
            switch resp := sub[i].Resp; {
            case resp == nil:
                return ErrRespIsRequired
            case resp.IsArray() && len(resp.Array) == 1:
                array[i] = resp.Array[0]
            default:
                return fmt.Errorf("bad mget resp: %s array.len = %d", resp.Type, len(resp.Array))
            }
        }
        r.Resp = redis.NewArray(array)  //結果儲存在r.Resp,以備loopWriter使用
        return nil
    }
    return nil
}
  • 子請求拆分及方法繫結先不討論,我們重點看一下d.dispatch,它是命令分發的重要環節,可以將命令分發到具體的codis-server上:
func (s *Router) dispatch(r *Request) error {
    hkey := getHashKey(r.Multi, r.OpStr)
    var id = Hash(hkey) % MaxSlotNum     //MaxSlotNum=1024
    slot := &s.slots[id]                 //確定槽位,處理方法直接繫結在槽位裡面
    return slot.forward(r, hkey)
}
func (s *Slot) forward(r *Request, hkey []byte) error {
    return s.method.Forward(s, r, hkey)
}
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
    s.lock.RLock()
    bc, err := d.process(s, r, hkey)    //獲取處理實際例項BackendConn
    s.lock.RUnlock()
    if err != nil {
        return err
    }
    bc.PushBack(r)                     //將請求放入BackendConn等待處理
    return nil
}
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
    if s.backend.bc == nil {
        log.Debugf("slot-%04d is not ready: hash key = '%s'",
            s.id, hkey)
        return nil, ErrSlotIsNotReady
    }
    if s.migrate.bc != nil && len(hkey) != 0 {
        if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil { //獲取BackendConn
            log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
                s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
            return nil, err
        }
    }
    r.Group = &s.refs
    r.Group.Add(1)
    return d.forward2(s, r), nil    //從三個不同的地方獲取BackendConn區別不太明白
}
func (d *forwardHelper) forward2(s *Slot, r *Request) *BackendConn {
    var database, seed = r.Database, r.Seed16()
    if s.migrate.bc == nil && r.IsReadOnly() && len(s.replicaGroups) != 0 {
        for _, group := range s.replicaGroups {
            var i = seed
            for _ = range group {
                i = (i + 1) % uint(len(group))
                if bc := group[i].BackendConn(database, seed, false); bc != nil {
                    return bc
                }
            }
        }
    }
    return s.backend.bc.BackendConn(database, seed, true)
}
  • 上述邏輯主要就是先從槽位Slot裡獲取BackendConn處理例項,然後把請求r *Request放入BackendConn的處理佇列input(chan *Request,在for迴圈中chan為空不會終止迴圈而是會等待新的請求進入),那麼存在一個問題BackendConn在哪裡建立的,經過分析得知BackendConn同樣存在loopWriter和loopReader,而他的啟動見程式:
func NewBackendConn(addr string, database int, config *Config) *BackendConn {
    bc := &BackendConn{
        addr: addr, config: config, database: database,
    }
    bc.input = make(chan *Request, 1024)
    bc.retry.delay = &DelayExp2{
        Min: 50, Max: 5000,
        Unit: time.Millisecond,
    }
    go bc.run()
    return bc
}
func (bc *BackendConn) run() {
    log.Warnf("backend conn [%p] to %s, db-%d start service",
        bc, bc.addr, bc.database)
    for round := 0; bc.closed.IsFalse(); round++ {
        log.Warnf("backend conn [%p] to %s, db-%d round-[%d]",
            bc, bc.addr, bc.database, round)
        if err := bc.loopWriter(round); err != nil {  //bc.loopWriter相當於Ws
            bc.delayBeforeRetry()
        }
    }
    log.Warnf("backend conn [%p] to %s, db-%d stop and exit",
        bc, bc.addr, bc.database)
}

loopWriter->newBackendReader:
func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) {
    c, err := redis.DialTimeout(bc.addr, time.Second*5,
        config.BackendRecvBufsize.AsInt(),
        config.BackendSendBufsize.AsInt())
    if err != nil {
        return nil, nil, err
    }
    c.ReaderTimeout = config.BackendRecvTimeout.Duration()
    c.WriterTimeout = config.BackendSendTimeout.Duration()
    c.SetKeepAlivePeriod(config.BackendKeepAlivePeriod.Duration())

    if err := bc.verifyAuth(c, config.ProductAuth); err != nil {
        c.Close()
        return nil, nil, err
    }
    if err := bc.selectDatabase(c, bc.database); err != nil {
        c.Close()
        return nil, nil, err
    }
    tasks := make(chan *Request, config.BackendMaxPipeline)
    go bc.loopReader(tasks, c, round)   //loopReader相當於Rs
    return c, tasks, nil
}
  • 從上面的程式碼基本可以看出一個執行鏈:NewBackendConn->run->loopWriter->newBackendReader->loopReader,不過還有個疑惑,沒有看到NewBackendConn的呼叫處,實際上他的呼叫源頭在$GOPATH/src/github.com/CodisLabs/codis/pkg/proxy/router.go的fillSlot裡面:
func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) {
    slot := &s.slots[m.Id]
    slot.blockAndWait()

    slot.backend.bc.Release()
    slot.backend.bc = nil
    slot.backend.id = 0
    slot.migrate.bc.Release()
    slot.migrate.bc = nil
    slot.migrate.id = 0
    for i := range slot.replicaGroups {
        for _, bc := range slot.replicaGroups[i] {
            bc.Release()
        }
    }
    slot.replicaGroups = nil

    slot.switched = switched

    if addr := m.BackendAddr; len(addr) != 0 {
        slot.backend.bc = s.pool.primary.Retain(addr)
        slot.backend.id = m.BackendAddrGroupId
    }
    if from := m.MigrateFrom; len(from) != 0 {
        slot.migrate.bc = s.pool.primary.Retain(from)
        slot.migrate.id = m.MigrateFromGroupId
    }
    if !s.config.BackendPrimaryOnly {
        for i := range m.ReplicaGroups {
            var group []*sharedBackendConn
            for _, addr := range m.ReplicaGroups[i] {
                group = append(group, s.pool.replica.Retain(addr))
            }
            if len(group) == 0 {
                continue
            }
            slot.replicaGroups = append(slot.replicaGroups, group)
        }
    }
    ...
}
func (p *sharedBackendConnPool) Retain(addr string) *sharedBackendConn {
    if bc := p.pool[addr]; bc != nil {
        return bc.Retain()
    } else {
        bc = newSharedBackendConn(addr, p)
        p.pool[addr] = bc
        return bc
    }
}
func newSharedBackendConn(addr string, pool *sharedBackendConnPool) *sharedBackendConn {
    host, port, err := net.SplitHostPort(addr)
    if err != nil {
        log.ErrorErrorf(err, "split host-port failed, address = %s", addr)
    }
    s := &sharedBackendConn{
        addr: addr,
        host: []byte(host), port: []byte(port),
    }
    s.owner = pool
    s.conns = make([][]*BackendConn, pool.config.BackendNumberDatabases)
    for database := range s.conns {
        parallel := make([]*BackendConn, pool.parallel)
        for i := range parallel {
            parallel[i] = NewBackendConn(addr, database, pool.config)  //建立BackendConn
        }
        s.conns[database] = parallel
    }
    if pool.parallel == 1 {
        s.single = make([]*BackendConn, len(s.conns))
        for database := range s.conns {
            s.single[database] = s.conns[database][0]
        }
    }
    s.refcnt = 1
    return s
}
  • 而fillSlot的呼叫在叢集啟動的槽位分配操作或者後續槽位調整操作裡面來做,至此我們簡單分析一下backend裡面的loopWriter和loopReader
func (bc *BackendConn) loopWriter(round int) (err error) {
    defer func() {
        for i := len(bc.input); i != 0; i-- {
            r := <-bc.input
            bc.setResponse(r, nil, ErrBackendConnReset)
        }
        log.WarnErrorf(err, "backend conn [%p] to %s, db-%d writer-[%d] exit",
            bc, bc.addr, bc.database, round)
    }()
    c, tasks, err := bc.newBackendReader(round, bc.config)    //新建指令處理backend
    if err != nil {
        return err
    }
    defer close(tasks)

    defer bc.state.Set(0)

    bc.state.Set(stateConnected)
    bc.retry.fails = 0
    bc.retry.delay.Reset()

    p := c.FlushEncoder()
    p.MaxInterval = time.Millisecond
    p.MaxBuffered = math2.MinInt(256, cap(tasks))

    for r := range bc.input {              //迴圈將請求放入tasks佇列中,bc.input空會阻塞等待
        if r.IsReadOnly() && r.IsBroken() {
            bc.setResponse(r, nil, ErrRequestIsBroken)
            continue
        }
        if err := p.EncodeMultiBulk(r.Multi); err != nil {
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        }
        if err := p.Flush(len(bc.input) == 0); err != nil {
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        } else {
            tasks <- r
        }
    }
    return nil
}

func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) {
    c, err := redis.DialTimeout(bc.addr, time.Second*5,
        config.BackendRecvBufsize.AsInt(),
        config.BackendSendBufsize.AsInt())    //創建於redis的連結,池化(newBackendReader初始化調)
    if err != nil {
        return nil, nil, err
    }
    c.ReaderTimeout = config.BackendRecvTimeout.Duration()
    c.WriterTimeout = config.BackendSendTimeout.Duration()
    c.SetKeepAlivePeriod(config.BackendKeepAlivePeriod.Duration())

    if err := bc.verifyAuth(c, config.ProductAuth); err != nil { //許可權驗證
        c.Close()
        return nil, nil, err
    }
    if err := bc.selectDatabase(c, bc.database); err != nil {   //執行redis命令,獲取結果
        c.Close()
        return nil, nil, err
    }

    tasks := make(chan *Request, config.BackendMaxPipeline)    
    go bc.loopReader(tasks, c, round)                          //結果回寫                      

    return c, tasks, nil
}
func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round int) (err error) {
    defer func() {
        c.Close()
        for r := range tasks {
            bc.setResponse(r, nil, ErrBackendConnReset)
        }
        log.WarnErrorf(err, "backend conn [%p] to %s, db-%d reader-[%d] exit",
            bc, bc.addr, bc.database, round)
    }()
    for r := range tasks {                                  //相當於while迴圈
        resp, err := c.Decode()                             //結果解碼
        if err != nil {
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        }
        if resp != nil && resp.IsError() {
            switch {
            case bytes.HasPrefix(resp.Value, errMasterDown):
                if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
                    log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
                        bc, bc.addr, bc.database)
                }
            }
        }
        bc.setResponse(r, resp, nil)                      //r.Resp= resp,置結果到r *request
    }
    return nil
}
  • 最後我們看看s.loopWriter是如何讀取結果並返回給客戶端的:
func (s *Session) loopWriter(tasks *RequestChan) (err error) {
    defer func() {
        s.CloseWithError(err)
        tasks.PopFrontAllVoid(func(r *Request) {
            s.incrOpFails(r, nil)
        })
        s.flushOpStats(true)
    }()

    var breakOnFailure = s.config.SessionBreakOnFailure

    p := s.Conn.FlushEncoder()
    p.MaxInterval = time.Millisecond
    p.MaxBuffered = 256

    return tasks.PopFrontAll(func(r *Request) error {
        resp, err := s.handleResponse(r)     //獲取響應結果
        if err != nil {
            resp = redis.NewErrorf("ERR handle response, %s", err)
            if breakOnFailure {
                s.Conn.Encode(resp, true)
                return s.incrOpFails(r, err)
            }
        }
        if err := p.Encode(resp); err != nil {
            return s.incrOpFails(r, err)
        }
        fflush := tasks.IsEmpty()
        if err := p.Flush(fflush); err != nil {
            return s.incrOpFails(r, err)
        } else {
            s.incrOpStats(r, resp.Type)
        }
        if fflush {
            s.flushOpStats(false)
        }
        return nil
    })
}
func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error {
    for {
        r, ok := c.PopFront()
        if ok {
            if err := onRequest(r); err != nil {
                return err
            }
        } else {
            return nil
        }
    }
}
func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {
    r.Batch.Wait()
    if r.Coalesce != nil {
        if err := r.Coalesce(); err != nil {  //r.Coalesce合併backend返回的結果
            return nil, err
        }
    }
    if err := r.Err; err != nil {
        return nil, err
    } else if r.Resp == nil {
        return nil, ErrRespIsRequired
    }
    return r.Resp, nil
}
  • 函式Coalesce()是前面提到在函式func (s *Session) handleRequestMGet(r *Request, d *Router) error 裡面繫結的結果合併函式。至此codis-proxy的主要實現原理基本分析完成