1. 程式人生 > 其它 >Etcd中Raft joint consensus的實現

Etcd中Raft joint consensus的實現

Joint consensus

分為2個階段,first switches to a transitional configuration we call joint consensus; once the joint consensus has been committed, the system then transitions to the new configuration. The joint consensus combines both the old and new configurations.

這樣就非常直觀,在joint consensus的配置中,包含了新老節點的配置,如果有節點變更,並進入了 joint consesus階段,在日誌複製的大多數同意策略中,達成一致就需要新老節點中都有節點出來同意。在 joint consensus 狀態中,同意節點變更的應用也是需要大多數同意的,因為是基於日誌複製來分發。

Raft log replication flow

上圖是etcd中raft提交的一個簡約流程,沒有標註順序是因為都在自旋。

很有意思的是,我理解joint consensus的實現類似於log replication的應用,將joint consensus中帶有的成員變更資訊作為日誌的內容通過log replication來應用到每個raft節點成員中。所以可以說,joint consensus是基於raft log replication。

如果將raft演算法模組看作整體,說是自舉其實也能部分說通,因為它改變了自身的成員資訊,影響了raft將訊息同步到誰的策略。

說是log replication的應用也是可以的,將raft演算法模組白盒化,log replication算作一個子模組,joint consensus算作一個。

說本身就是raft演算法模組,不將joint consensus看作是新增的部分,也是可行的。

Impl in Etcd

開始和結束joint consensus 的 message type 是定義在 pb中的。

const (
   EntryNormal       EntryType = 0
   EntryConfChange   EntryType = 1
   EntryConfChangeV2 EntryType = 2
)

joint consensus的過程,首先是通過propose將EntryConfChange傳入到raft演算法模組內部,在leader應用了更改後,同樣傳送EntryConfChange訊息到其他節點,大多數同意後,follower&learner才開始apply節點變更,並且leader 在 apply時傳送EntryConfChangeV2結束節點變更的訊息。

Structure of join consensus

在Confchange中封裝了joint consensus的階段變更邏輯。

JointConfig是由2個MajorityConfig的map組成,一個對應變更節點集合,一個對應老節點集合。

由上圖的引用關係最終在Confchange中來組裝邏輯。

Confchange中將變更節點集合表示為 incoming,老節點集合表示為 outgoing。

func incoming(voters quorum.JointConfig) quorum.MajorityConfig      { return voters[0] }
func outgoing(voters quorum.JointConfig) quorum.MajorityConfig      { return voters[1] }
func outgoingPtr(voters *quorum.JointConfig) *quorum.MajorityConfig { return &voters[1] }

節點未發生變更時,節點資訊儲存在JointConfig[0] ,即incoming的指向的集合中。

當EnterJoint時,將老節點拷貝至outgoing中,變更節點拷貝至incoming中。

LeaveJoint時,刪除下線的節點,合併在線的節點併合並至incoming中,完成節點變更過程。

Logic flow of joint consensus

Proposal trigger

在EtcdServer中不論是AddMember()還是RemoveMembe()以及其他2個會修改Member成員的方法,都會觸發

configure()函式

// configure sends a configuration change through consensus and
// then waits for it to be applied to the server. It
// will block until the change is performed or there is an error.
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
   lg := s.Logger()
   cc.ID = s.reqIDGen.Next()
   ch := s.w.Register(cc.ID)

   start := time.Now()
   if err := s.r.ProposeConfChange(ctx, cc); err != nil {
      s.w.Trigger(cc.ID, nil)
      return nil, err
   }

   select {
   case x := <-ch:
      if x == nil {
         lg.Panic("failed to configure")
      }
      resp := x.(*confChangeResponse)
      lg.Info(
         "applied a configuration change through raft",
         zap.String("local-member-id", s.ID().String()),
         zap.String("raft-conf-change", cc.Type.String()),
         zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
      )
      return resp.membs, resp.err

   case <-ctx.Done():
      s.w.Trigger(cc.ID, nil) // GC wait
      return nil, s.parseProposeCtxErr(ctx.Err(), start)

   case <-s.stopping:
      return nil, ErrStopped
   }
}

s.r.ProposeConfChange(ctx, cc) 會將Config Change向Raft中 propose。

這段裡面還有個有趣的邏輯,利用了一個channel等待操作完成再返回結果,和讀一致性的實現異曲同工。

//向channel池中註冊一個需要等待的channel
ch := s.w.Register(cc.ID)
//傳送訊號進該channel
s.w.Trigger(cc.ID, nil)

select {
  //等待該channel返回訊號
   case x := <-ch:
      if x == nil {
         lg.Panic("failed to configure")
      }
...
}

Proposal logic of Leader

Leader收到Proposal後,會進行如下處理:

e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
   var ccc pb.ConfChange
   if err := ccc.Unmarshal(e.Data); err != nil {
      panic(err)
   }
   cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
   var ccc pb.ConfChangeV2
   if err := ccc.Unmarshal(e.Data); err != nil {
      panic(err)
   }
   cc = ccc
}
if cc != nil {
   alreadyPending := r.pendingConfIndex > r.raftLog.applied
   alreadyJoint := len(r.prs.Config.Voters[1]) > 0
   wantsLeaveJoint := len(cc.AsV2().Changes) == 0

   var refused string
   if alreadyPending {
      refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
   } else if alreadyJoint && !wantsLeaveJoint {
      refused = "must transition out of joint config first"
   } else if !alreadyJoint && wantsLeaveJoint {
      refused = "not in joint state; refusing empty conf change"
   }

   if refused != "" {
      r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
      m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
   } else {
      r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
   }
}

如果發現當前是在joint consensus過程中,拒絕變更,直接將message type 變成普通的entry。

處理完畢後,會等待將該訊息分發。

Logic of apply

當大多數節點commit後,就會Ready至EtcdServer,然後開始apply config的過程,同apply log的過程是相同的。

自旋Ready從raft module 到 EtcdServer的過程在上一篇日誌複製中已經描述過。

直接看raft module 中的程式碼。

apply分為2步,第1步是EtcdServer apply raft log的邏輯,第2步是raft 膠水 advance()的邏輯。

在joint consensus中就是首先應用節點配置,然後在advance中結束 joint consensus。

apply config

func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
   cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
      changer := confchange.Changer{
         Tracker:   r.prs,
         LastIndex: r.raftLog.lastIndex(),
      }
      if cc.LeaveJoint() {
         return changer.LeaveJoint()
      } else if autoLeave, ok := cc.EnterJoint(); ok {
         return changer.EnterJoint(autoLeave, cc.Changes...)
      }
      return changer.Simple(cc.Changes...)
   }()

   if err != nil {
      // TODO(tbg): return the error to the caller.
      panic(err)
   }

   return r.switchToConfig(cfg, prs)
}

邏輯的解釋在上面介紹Changer中。

joint consensus completely

func (r *raft) advance(rd Ready) {
   r.reduceUncommittedSize(rd.CommittedEntries)

   // If entries were applied (or a snapshot), update our cursor for
   // the next Ready. Note that if the current HardState contains a
   // new Commit index, this does not mean that we're also applying
   // all of the new entries due to commit pagination by size.
   if newApplied := rd.appliedCursor(); newApplied > 0 {
      oldApplied := r.raftLog.applied
      r.raftLog.appliedTo(newApplied)

      if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
         // If the current (and most recent, at least for this leader's term)
         // configuration should be auto-left, initiate that now. We use a
         // nil Data which unmarshals into an empty ConfChangeV2 and has the
         // benefit that appendEntry can never refuse it based on its size
         // (which registers as zero).
         ent := pb.Entry{
            Type: pb.EntryConfChangeV2,
            Data: nil,
         }
         // There's no way in which this proposal should be able to be rejected.
         if !r.appendEntry(ent) {
            panic("refused un-refusable auto-leaving ConfChangeV2")
         }
         r.pendingConfIndex = r.raftLog.lastIndex()
         r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
      }
   }

   if len(rd.Entries) > 0 {
      e := rd.Entries[len(rd.Entries)-1]
      r.raftLog.stableTo(e.Index, e.Term)
   }
   if !IsEmptySnap(rd.Snapshot) {
      r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
   }
}

如果是Leader並且正在joint consensus過程中,將EntryConfChangeV2加入自己的日誌中,通過Ready的自旋進行分發。

同樣是日誌複製的過程來結束joint consensus。

Summary

整個過程的邏輯還未用場景來做一些推導,等把讀一致性寫完,同大多數複製來一起驗證一次。