1. 程式人生 > >一文弄懂分散式場景中各種鎖的原理及使用

一文弄懂分散式場景中各種鎖的原理及使用

1. 語言層面的鎖

樂觀鎖:

原子操作中的比較並交換簡稱CAS(Compare And Swap),在sync/atomic包中,這類原子操作由名稱以CompareAndSwap為字首的若干個函式提供
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer,old, new unsafe.Pointer) (swapped bool)
使用AddInt32函式對int32值執行新增原子操作:
func main() {
	var n int32
	var wg sync.WaitGroup
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&n, 1)
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&n)) // output:1000
}

golang中原子操作CompareAndSwap:

CompareAndSwap函式會先判斷引數addr指向的操作值與引數old的值是否相等,僅當此判斷得到的結果是true之後,才會用引數new代表的新值替換掉原先的舊值,否則操作就會被忽略。atmoic原子操作總是假設被操作值未曾被改變(即與舊值相等),並一旦確認這個假設的真實性就立即進行值替換。在被操作值被頻繁變更的情況下,CAS操作並不那麼容易成功所以需要不斷進行嘗試,直到成功為止。

互斥鎖:

golang中互斥鎖的一個經典實現就是sync包下的sync.mutex,下面以併發訪問slice為例:

slice是對陣列一個連續片段的引用,當 slice 長度增加的時候,可能底層的陣列會被換掉。當在換底層陣列之前,切片同時被多個 goroutine 拿到,並執行 append 操作。那麼很多 goroutine 的 append 結果會被覆蓋,導致 n 個 gouroutine append 後,長度小於n,互斥鎖解決併發訪問slice的場景:
func main() {
	slc := make([]int, 0, 1000)
	var wg sync.WaitGroup
	var lock sync.Mutex

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func(a int) {
			defer wg.Done()
			// 加鎖
			lock.Lock()
			defer lock.Unlock()
			slc = append(slc, a)
		}(i)
	}
	wg.Wait()
	fmt.Println(len(slc))
}

缺點:分散式部署環境下鎖會失效

 

2. mysql資料庫實現鎖

方案一:使用資料庫的唯一性來實現資源鎖定,比如主鍵和唯一索引等;建立一個欄位為唯一索引,加入一條資料即表示加鎖了,刪除這條資料就解鎖了;具體,使用很簡單,具體實現就不再闡述 資料庫中,為了實現高併發的資料訪問,對資料進行多版本處理,並通過事務的可見性來保證事務能看到自己應該看到的資料版本, 方案二:select for update解決併發資料查詢更新的問題
SET AUTOCOMMIT=0; 
BEGIN WORK; 
SELECT category_id FROM blog_article WHERE id=3 FOR UPDATE;
UPDATE blog_article SET category_id = 3;
# 在commit前其它事物無法對此行資料進行修改
COMMIT WORK;
在另外一個視窗執行新的事物修改:
UPDATE blog_article SET category_id = 2 WHERE id = 3;

會發現事物無法立即執行,會等待for update那條事物commit,如果此時長時間未commit則會超時:

[SQL]UPDATE blog_article SET category_id = 2 WHERE id = 3;
[Err] 1205 - Lock wait timeout exceeded; try restarting transaction

 

缺點:單機mysql負載能力有限,mysql鎖效能低下,select for update加鎖如果where條件後的欄位非主鍵則"表鎖",如果是主鍵則為"行鎖"  

3. zookeeper、etcd實現分散式鎖

zookeeper實現分散式鎖:

利用 ZooKeeper 支援臨時順序節點的特性,可以實現分散式鎖;當客戶端對某個方法加鎖時,在 ZooKeeper 中該方法對應的指定節點目錄下,生成一個唯一的臨時有序節點。 ZooKeeper 實現分散式鎖的演算法流程,根節點為 /lock: 客戶端連線 ZooKeeper,並在 /lock 下建立臨時有序子節點,第一個客戶端對應的子節點為 /lock/lock01/00000001,第二個為 /lock/lock01/00000002; 其他客戶端獲取 /lock01 下的子節點列表,判斷自己建立的子節點是否為當前列表中序號最小的子節點; 如果是則認為獲得鎖,執行業務程式碼,否則通過 watch 事件監聽 /lock01 的子節點變更訊息,獲得變更通知後重復此步驟直至獲得鎖; 完成業務流程後,刪除對應的子節點,釋放分散式鎖。
// 建立zookeeper連線,並建立永久父級節點
func NewZkConn(address, parentPath string) *zk.Conn {
	hosts := []string{address}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		panic(err)
	}
	ok, _, _ := conn.Exists(parentPath)
	if !ok {
		// 建立永久節點
		nodeName, err := conn.Create(parentPath, nil, zk.FlagSequence, acls)
		if err != nil {
			panic(err)
		}
		fmt.Println("create node name :", nodeName)
	}
	return conn
}

// nodeCreateSuccess 當前節點是否已成功建立
func nodeCreateSuccess(conn *zk.Conn, path string, id int) bool {
	ok, _, ch, err := conn.ExistsW(path)
	if err != nil {
		return false
	}

	ex := false
	// 節點已存在,則監聽狀態變化
	if ok {
		for {
			select {
			case c := <-ch:
				{
					if c.Type == zk.EventNodeDeleted {
						ex = true
						break
					}
				}
			}
			if ex {
				break
			}
		}
	}

	// 節點不存在則嘗試建立
	_, err = conn.Create(path, nil, flags, acls)
	if err != nil {
		return false
	}
	fmt.Printf("[%s] 創造節點的id為 [%d] \n", path, id)
	return true
}

func main() {
	conn := NewZkConn(zkHosts, parentPath)
	// 假設臨時節點
	path := parentPath + "/001_test_zookeeper_lock"
	for i := 0; i < 10; i++ {
		go func(conn *zk.Conn, path string, id int) {
			// 節點未建立成功則阻塞等待
			for {
				ok := nodeCreateSuccess(conn, path, id)
				// ok=true表示當前節點已成功建立
				if ok {
					// 釋放當前節點鎖
					err := conn.Delete(path, 0)
					if err != nil {
						fmt.Println(err)
					}
					fmt.Printf("刪除成功 id為[%d] \n", id)
					break
				}
			}
		}(conn, path, i)
	}

	time.Sleep(time.Second * 10)
}
etcd實現分散式鎖: etcd有個很重要的特性,它的key value是多版本的,當有了一個值之後,再put時它的版本是不斷地往上加的,這裡跟zookeeper類似,判斷是否是最小的版本
  1. 利用租約在etcd叢集中建立多個key,這個key有兩種形態,存在和不存在,而這兩種形態就是互斥量。
  2. 通過Prefix字首機制獲取字首目錄下所有KV及Revision,通過Revision機制判斷當前執行緒是否能獲取到鎖。
  3. 通過Watch監聽機制來監聽前一個Revision的刪除事件。
func main() {
	var (
		config        clientv3.Config
		client        *clientv3.Client
		lease         clientv3.Lease
		leaseResp     *clientv3.LeaseGrantResponse
		leaseId       clientv3.LeaseID
		leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse
		err           error
	)
	//客戶端配置
	config = clientv3.Config{
		Endpoints:   []string{"etcd2.sndu.cn:2379"},
		DialTimeout: 5 * time.Second,
	}
	//建立連線
	if client, err = clientv3.New(config); err != nil {
		fmt.Println(err)
		return
	}
	//上鎖(建立租約,自動續租)
	lease = clientv3.NewLease(client)
	//設定1個ctx取消自動續租  執行cancleFunc即執行cancel操作
	ctx, cancleFunc := context.WithCancel(context.TODO())
	//設定10秒租約(過期時間)
	if leaseResp, err = lease.Grant(context.TODO(), 10); err != nil {
		fmt.Println(err)
		return
	}
	//拿到租約id
	leaseId = leaseResp.ID
	//自動續租(不停地往管道中扔租約資訊)
	if leaseRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
		fmt.Println(err)
	}
	//啟動多個協程去監聽
	go listenLeaseChan(leaseRespChan)
	//業務處理
	kv := clientv3.NewKV(client)
	//建立事務
	txn := kv.Txn(context.TODO())
	txn.If(clientv3.Compare(clientv3.CreateRevision("/lock/20201029-etcd"), "=", 0)).
		Then(clientv3.OpPut("/lock/20201029-etcd", "true",
			clientv3.WithLease(leaseId))).
		Else(clientv3.OpGet("/lock/20201029-etcd")) //否則搶鎖失敗
	//提交事務
	if txtResp, err := txn.Commit(); err != nil {
		fmt.Println(err)
		return
	} else {
		//判斷是否搶鎖
		if !txtResp.Succeeded {
			fmt.Println("鎖被佔用:",
				string(txtResp.Responses[0].GetResponseRange().Kvs[0].Value))
			return
		}
	}
	fmt.Println("處理任務")
	//釋放鎖(停止續租,終止租約)
	defer cancleFunc()                          //函式退出取消自動續租
	defer lease.Revoke(context.TODO(), leaseId) //終止租約(去掉過期時間)
	time.Sleep(10 * time.Second)
}

// listenLeaseChan 監聽租約情況
func listenLeaseChan(leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse) {
	var leaseKeepResp *clientv3.LeaseKeepAliveResponse
	for {
		select {
		case leaseKeepResp = <-leaseRespChan:
			if leaseKeepResp == nil {
				fmt.Println("租約失效了")
				goto END
			} else {
				fmt.Println(leaseKeepResp.ID)
			}
		}
	}
END:
}

在etcd官方的實現中其實已經實現了分散式鎖,具體實現程式碼在https://github.com/etcd-io/etcd/blob/master/client/v3/concurrency/mutex.go目錄下:

// TryLock 嘗試加鎖 比較revision是否為最小版本
func (m *Mutex) TryLock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	client := m.s.Client()
	// Cannot lock, so delete the key
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return ErrLocked
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	client := m.s.Client()
	// wait for deletion revisions prior to myKey
	// TODO: early termination if the session key is deleted before other session keys with smaller revisions.
	_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	// make sure the session is not expired, and the owner key still exists.
	gresp, werr := client.Get(ctx, m.myKey)
	if werr != nil {
		m.Unlock(client.Ctx())
		return werr
	}

	if len(gresp.Kvs) == 0 { // is the session key lost?
		return ErrSessionExpired
	}
	m.hdr = gresp.Header

	return nil
}

// tryAcquire 嘗試釋放鎖
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()

	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put self in lock waiters via myKey; oldest waiter holds lock
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// reuse key in case this session already holds the lock
	get := v3.OpGet(m.myKey)
	// fetch current holder to complete uncontended path with only one RPC
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	return resp, nil
}

// Unlock 釋放鎖 刪除節點資訊
func (m *Mutex) Unlock(ctx context.Context) error {
	client := m.s.Client()
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return nil
}

4. redis實現分散式鎖

Redis分散式鎖控制併發主要是通過在Redis裡面建立一個key,當其它程序準備佔用的時候只能等待key釋放再佔用。Redis裡面有一個原子性指令setnx,當key存在時,它返回0,表示當前已有程序佔用,當它返回1時可以執行業務邏輯,此時沒有程序佔用,等邏輯執行完後,可以刪除key釋放鎖,這樣可以簡單的控制併發:

127.0.0.1:6379> setnx distributedKey aaa
(integer) 1
127.0.0.1:6379> setnx distributedKey aaa
(integer) 0
127.0.0.1:6379> get distributedKey
"aaa"
127.0.0.1:6379> 

在業務邏輯執行的過程中如果發生異常,此時key並沒有刪除,這樣就會造成死鎖,死鎖帶來的後果想必大家都很清楚。為了解決這個問題,可以在setnx加鎖後設置key的過期時間,當key到期自動刪除:

127.0.0.1:6379> expire distributedKey 5
(integer) 1
127.0.0.1:6379>

如果在執行setnx後,執行expire前Redis發生宕機了,這樣就不會執行expire,也會造成死鎖。由於setnx與expire是兩條命令,並且expire依賴setnx的執行結果,為了解決這個問題可以使用set key value [expiration EX seconds|PX milliseconds] [NX|XX] ,這是一條原子性的指令,同時包含setnx和expire:

127.0.0.1:6379> set distributedKey aaa ex 5 nx
OK
127.0.0.1:6379> set distributedKey aaa ex 5 nx
(nil)

key存在時執行會返回nil,只有key過期或不存在時才會返回ok

// DistributedLock 併發鎖
func DistributedLock(key string, expire int, c redis.Conn, value time.Time) (bool, error) {
	// 設定原子鎖
	defer c.Close()
	exists, err := c.Do("set", key, value, "nx", "ex", expire)
	if err != nil {
		return false, errors.New("執行 set nx ex 失敗")
	}

	// 鎖已存在,已被佔用
	if exists != nil {
		return false, nil
	}

	return true, nil
}

// ReleaseLock 釋放鎖
func ReleaseLock(c redis.Conn, key string) (bool, error) {
	defer c.Close()
	v, err := redis.Bool(c.Do("DEL", key))
	return v, err
}

呼叫:

func DoSomething(c redis.Conn, key string, expire int, value time.Time) {
	// 獲取鎖
	defer c.Close()
	canUse, err := DistributedLock(key, expire, c, value)
	if err != nil {
		panic(err)
	}
	// 佔用鎖
	if canUse {
		fmt.Println("start do something ...")
		// 釋放鎖
		_, err := ReleaseLock(c, key)
		if err != nil {
			panic(err)
		}
	}
	return
}

 redis釋放鎖的問題:

如上圖所示,執行緒A先獲得鎖,執行超時鎖自動釋放,此時執行緒B獲取鎖開始執行,A執行完後釋放了B所持有的鎖,這時B繼續執行,並且執行緒C能獲取鎖,同一時刻執行緒A和B同時執行鎖,違背了分散式鎖的安全性。

5. redis+lua實現原子性釋放分散式鎖

定義lua指令碼釋放鎖:

const (
	// ScriptDeleteLock 釋放redis併發鎖 lua指令碼 判斷value為本次鎖的value才釋放
	ScriptDeleteLock = `
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
`
)

// ReleaseLockWithLua 釋放鎖 使用lua指令碼執行
func ReleaseLockWithLua(c redis.Conn, key string, value time.Time) (int, error) {
	// keyCount表示lua指令碼中key的個數
	defer c.Close()
	lua := redis.NewScript(1, ScriptDeleteLock)
	// lua指令碼中的引數為key和value
	res, err := redis.Int(lua.Do(c, key, value))
	if err != nil {
		return 0, err
	}
	return res, nil
}

呼叫:

func DoSomethingWithLua(c redis.Conn, key string, expire int, value time.Time) {
	// 獲取鎖
	defer c.Close()
	canUse, err := DistributedLock(key, expire, c, value)
	if err != nil {
		panic(err)
	}
	// 佔用鎖
	if canUse {
		fmt.Println("start do something ...")
		// 釋放鎖 lua指令碼執行原子性刪除
		_, err := ReleaseLockWithLua(c, key, value)
		if err != nil {
			panic(err)
		}
	}
	return
}

redis sentinel叢集下鎖的同步問題:

  

如上圖所示,執行緒在master成功建立鎖,此時鎖還未同步到slave,master發生宕機,當slave1成我新master後鎖丟失。

6. redlock演算法及相關問題

redlock演算法流程

如上圖所示,redlock演算法的實現流程,每次加鎖的時候嘗試向redis叢集中每個節點申請加鎖,當前節點加鎖失敗則跳過繼續向下一個節點執行加鎖請求,只有大於一半的節點加鎖成功才認為分散式鎖成功;釋放鎖時同樣需配合lua指令碼向所有的redis節點發起釋放鎖請求。

redlock演算法跳躍時鐘問題

上述redlock演算法已經解決了redis叢集中master宕機導致鎖失效的問題,但是它是否就是完美的呢?如上圖所示,client1向redis叢集申請加鎖,此時節點A、B、C執行成功,client1成功獲取鎖,節點D和E由於網路原因加鎖失敗;這時節點C所在的伺服器由於時鐘向前跳躍導致鎖快速過期了,client2執行加鎖請求,顯然此時是能加鎖成功的;那麼相當於在同一時刻兩個程序能持有鎖,這顯然違背了分散式鎖的互斥性的特點。

redlock演算法GC停頓問題

 

同樣,還有一種特殊的情況就是GC停頓導致訊息延遲的問題,當client1向redis叢集發起加鎖請求並返回加鎖成功的結果,此時訊息延遲到達client1導致在這段時間redis叢集中的鎖過期了,client2顯然能夠正常獲取鎖,當GC恢復時client1收到結果會認為自己持有鎖,這同樣違背了分散式鎖互斥性的特點。

 

7. 總結

以上幾種鎖的實現方式並非說明哪種是最優解,具體場景需選擇具體的鎖。如果是單機環境建議直接使用語言層面的鎖來實現,這樣不需要引入額外的第三方依賴;如果是對資料庫的併發更新操作,並且併發量不是太大,可以使用mysql的select for update或者select for update nowait實現,但是注意儘量不要使用表鎖並且不要造成死鎖的問題;如果是對鎖的可靠性要求極高那麼建議使用zookeeper、etcd實現;最後如果在開發環境中沒有zookeeper、etcd等第三方元件,並且對鎖的效能要求比較高,可以使用單機的redis配合lua指令碼釋放鎖,這裡我個人並不推薦使用redloc