Codis原始碼解析——sentinel的重同步(2)
Topom.ha.monitor本身相當於一個上帝視角的sentinel。它本身並不是一個實際的sentinel伺服器,但是它負責收集各個sentinel的監控資訊,並對叢集作出反饋。這一講我們就來看看Topom.ha.monitor。這一篇的原始碼也有助於大家理解併發模型中context的使用。
下面引數中的servers []string就是新增的sentinel的ip:port所組成的字串陣列,有多少個sentinel,陣列的長度就有多少
func (s *Topom) rewatchSentinels(servers []string) {
if s.ha.monitor != nil {
s.ha.monitor.Cancel()
s.ha.monitor = nil
}
if len(servers) == 0 {
s.ha.masters = nil
} else {
//建立Topom中的ha.monitor
s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
s.ha.monitor.LogFunc = log.Warnf
s.ha.monitor.ErrFunc = log.WarnErrorf
go func(p *redis.Sentinel) {
var trigger = make(chan struct{}, 1)
//一個延時工具類,要麼休眠一秒,要麼休眠現在距離deadline的時間,取決於哪個更短
//如果現在已經過了deadline,就不休眠
delayUntil := func(deadline time.Time) {
//如果從Sentinel中Context.Done()讀出值,就表示這個sentinel的context已經被cancel
for !p.IsCanceled() {
var d = deadline.Sub(time.Now())
if d <= 0 {
return
}
time.Sleep(math2.MinDuration(d, time.Second))
}
}
go func() {
defer close(trigger)
callback := func() {
select {
case trigger <- struct{}{}:
default:
}
}
for !p.IsCanceled() {
timeout := time.Minute * 15
retryAt := time.Now().Add(time.Second * 10)
if !p.Subscribe(servers, timeout, callback) {
delayUntil(retryAt)
} else {
callback()
}
}
}()
go func() {
for _ = range trigger {
var success int
for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ {
timeout := time.Second * 5
masters, err := p.Masters(servers, timeout)
if err != nil {
log.WarnErrorf(err, "fetch group masters failed")
} else {
if !p.IsCanceled() {
s.SwitchMasters(masters)
}
success += 1
}
delayUntil(time.Now().Add(time.Second * 5))
}
}
}()
}(s.ha.monitor)
}
log.Warnf("rewatch sentinels = %v", servers)
}
//一個context被取消的標準就是能從Context.Done()中讀出值
func (s *Sentinel) IsCanceled() bool {
select {
case <-s.Context.Done():
return true
default:
return false
}
}
Subscribe是讓sentinel訂閱名為”+switch-master”的channel,並從這個channel中讀取主從切換的資訊。將訂閱成功與否寫到results := make(chan bool, len(sentinels))中在,最後再遍歷results
//timeout為15min
func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajoritySubscribed func()) bool {
cntx, cancel := context.WithTimeout(s.Context, timeout)
defer cancel()
timeout += time.Second * 5
results := make(chan bool, len(sentinels))
//叢集中sentinel數量的半數以上
var majority = 1 + len(sentinels)/2
var subscribed atomic2.Int64
for i := range sentinels {
go func(sentinel string) {
notified, err := s.subscribeDispatch(cntx, sentinel, timeout, func() {
if subscribed.Incr() == int64(majority) {
onMajoritySubscribed()
}
})
if err != nil {
s.errorf(err, "sentinel-[%s] subscribe failed", sentinel)
}
results <- notified
}(sentinels[i])
}
for alive := len(sentinels); ; alive-- {
//如果超過半數sentinel都沒有訂閱成功
if alive < majority {
if cntx.Err() == nil {
s.printf("sentinel subscribe lost majority (%d/%d)", alive, len(sentinels))
}
return false
}
select {
case <-cntx.Done():
if cntx.Err() != context.DeadlineExceeded {
s.printf("sentinel subscribe canceled (%v)", cntx.Err())
}
return false
case notified := <-results:
if notified {
s.printf("sentinel subscribe notified +switch-master")
return true
}
}
}
}
//訂閱"+switch-master"成功則返回true
func (s *Sentinel) subscribeDispatch(ctx context.Context, sentinel string, timeout time.Duration,
onSubscribed func()) (bool, error) {
var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
return s.subscribeCommand(c, sentinel, onSubscribed)
})
if err != nil {
switch errors.Cause(err) {
case context.Canceled, context.DeadlineExceeded:
return false, nil
default:
return false, err
}
}
return true, nil
}
func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
onSubscribed func()) error {
var channels = []interface{}{"+switch-master"}
if err := client.Flush("SUBSCRIBE", channels...); err != nil {
return errors.Trace(err)
}
for _, sub := range channels {
values, err := redigo.Values(client.Receive())
if err != nil {
return errors.Trace(err)
} else if len(values) != 3 {
return errors.Errorf("invalid response = %v", values)
}
s, err := redigo.Strings(values[:2], nil)
if err != nil || s[0] != "subscribe" || s[1] != sub.(string) {
return errors.Errorf("invalid response = %v", values)
}
}
onSubscribed()
for {
values, err := redigo.Values(client.Receive())
if err != nil {
return errors.Trace(err)
} else if len(values) < 2 {
return errors.Errorf("invalid response = %v", values)
}
message, err := redigo.Strings(values, nil)
if err != nil || message[0] != "message" {
return errors.Errorf("invalid response = %v", values)
}
s.printf("sentinel-[%s] subscribe event %v", sentinel, message)
//從訂閱的channel中讀取訊息
switch message[1] {
case "+switch-master":
if len(message) != 3 {
return errors.Errorf("invalid response = %v", values)
}
var params = strings.SplitN(message[2], " ", 2)
if len(params) != 2 {
return errors.Errorf("invalid response = %v", values)
}
_, yes := s.isSameProduct(params[0])
if yes {
return nil
}
}
}
}
注意,到上面為止,是叢集中的sentinel訂閱了redis伺服器之間主從切換的資訊,只有哨兵知道哪臺是master。對於codis叢集來講,並不清楚哪臺slave被推上了master。下面我們要做的,就是讓哨兵感知到的新的master同樣被codis叢集感知到,也就是將其推到每個group的第一臺server。
最後一步,通過SENTINEL INFO命令得到當前的主伺服器,然後在各個group中更新主伺服器資訊。比方說,如果超過半數sentinel認為group中序號為1的server才是master,就把這臺伺服器和序號為0的server進行交換
func (s *Sentinel) Masters(sentinels []string, timeout time.Duration) (map[int]string, error) {
cntx, cancel := context.WithTimeout(s.Context, timeout)
defer cancel()
timeout += time.Second * 5
results := make(chan map[int]*SentinelMaster, len(sentinels))
var majority = 1 + len(sentinels)/2
for i := range sentinels {
go func(sentinel string) {
//通過SENTINEL INFO命令得到哨兵感知到的master
masters, err := s.mastersDispatch(cntx, sentinel, timeout)
if err != nil {
s.errorf(err, "sentinel-[%s] masters failed", sentinel)
}
results <- masters
}(sentinels[i])
}
masters := make(map[int]string)
current := make(map[int]*SentinelMaster)
var voted int
for alive := len(sentinels); ; alive-- {
if alive == 0 {
switch {
case cntx.Err() != context.DeadlineExceeded && cntx.Err() != nil:
s.printf("sentinel masters canceled (%v)", cntx.Err())
return nil, errors.Trace(cntx.Err())
case voted != len(sentinels):
s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
}
if voted < majority {
return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
}
return masters, nil
}
select {
case <-cntx.Done():
switch {
case cntx.Err() != context.DeadlineExceeded:
s.printf("sentinel masters canceled (%v)", cntx.Err())
return nil, errors.Trace(cntx.Err())
default:
s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
}
//最終通過的方案必須是半數以上sentinel同意的
if voted < majority {
return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
}
return masters, nil
case m := <-results:
if m == nil {
continue
}
//構造sentinels選舉出的master
for gid, master := range m {
if current[gid] == nil || current[gid].Epoch < master.Epoch {
current[gid] = master
masters[gid] = master.Addr
}
}
voted += 1
}
}
}
func (s *Topom) SwitchMasters(masters map[int]string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrClosedTopom
}
s.ha.masters = masters
if len(masters) != 0 {
cache := &redis.InfoCache{
Auth: s.config.ProductAuth, Timeout: time.Millisecond * 100,
}
for gid, master := range masters {
if err := s.trySwitchGroupMaster(gid, master, cache); err != nil {
log.WarnErrorf(err, "sentinel switch group master failed")
}
}
}
return nil
}
//執行codis叢集可感知的主從切換
func (s *Topom) trySwitchGroupMaster(gid int, master string, cache *redis.InfoCache) error {
ctx, err := s.newContext()
if err != nil {
return err
}
g, err := ctx.getGroup(gid)
if err != nil {
return err
}
var index = func() int {
for i, x := range g.Servers {
if x.Addr == master {
return i
}
}
for i, x := range g.Servers {
rid1 := cache.GetRunId(master)
rid2 := cache.GetRunId(x.Addr)
if rid1 != "" && rid1 == rid2 {
return i
}
}
return -1
}()
if index == -1 {
return errors.Errorf("group-[%d] doesn't have server %s with runid = '%s'", g.Id, master, cache.GetRunId(master))
}
if index == 0 {
return nil
}
defer s.dirtyGroupCache(g.Id)
log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, index, g.Servers[index].Addr)
//執行主從切換,我們之前說過,codis叢集中預設每個group的第一個server為master
g.Servers[0], g.Servers[index] = g.Servers[index], g.Servers[0]
g.OutOfSync = true
return s.storeUpdateGroup(g)
}
下一步,在每個Proxy中設定其ha.servers為當前ctx中的sentinel,再執行一次上面的rewatchSentinels方法。
var fut sync2.Future
for _, p := range ctx.proxy {
fut.Add()
go func(p *models.Proxy) {
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
if err != nil {
log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
}
fut.Done(p.Token, err)
}(p)
}
for t, v := range fut.Wait() {
switch err := v.(type) {
case error:
if err != nil {
return errors.Errorf("proxy-[%s] sentinel failed", t)
}
}
}
p.OutOfSync = false
//更新zk資訊
return s.storeUpdateSentinel(p)
總結一下,當一臺sentinel第一次被新增到codis叢集,或者是脫離codis叢集之後,需要執行resync操作來重新對叢集做監控。首先遍歷所有server,放棄其原先監控的資訊。格式化之後,再重新監控叢集中的所有group,並根據dashboard.toml中的配置進行監控設定。最後,新建Topom.ha.monitor上帝視角sentinel,讓叢集中的所有sentinel訂閱”+switch-master”,如果發生主從切換(即可以從channel中讀出值),要從哨兵中讀出當前的master地址,並在每個codis group中將對應的server推到group的第一個。設定每個Proxy的ha.servers為當前ctx中的sentinel,再執行一次上面的rewatchSentinels方法,最後再將sentinel的OutofSync更新為true,然後再更新zk下儲存的資訊。
說明
如有轉載,請註明出處
http://blog.csdn.net/antony9118/article/details/78141271