一文弄懂分散式場景中各種鎖的原理及使用
1. 語言層面的鎖
樂觀鎖:
原子操作中的比較並交換簡稱CAS(Compare And Swap),在sync/atomic包中,這類原子操作由名稱以CompareAndSwap為字首的若干個函式提供func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer,old, new unsafe.Pointer) (swapped bool)使用AddInt32函式對int32值執行新增原子操作:
func main() { var n int32 var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { atomic.AddInt32(&n, 1) wg.Done() }() } wg.Wait() fmt.Println(atomic.LoadInt32(&n)) // output:1000 }
golang中原子操作CompareAndSwap:
CompareAndSwap函式會先判斷引數addr指向的操作值與引數old的值是否相等,僅當此判斷得到的結果是true之後,才會用引數new代表的新值替換掉原先的舊值,否則操作就會被忽略。atmoic原子操作總是假設被操作值未曾被改變(即與舊值相等),並一旦確認這個假設的真實性就立即進行值替換。在被操作值被頻繁變更的情況下,CAS操作並不那麼容易成功所以需要不斷進行嘗試,直到成功為止。
互斥鎖:
golang中互斥鎖的一個經典實現就是sync包下的sync.mutex,下面以併發訪問slice為例:
slice是對陣列一個連續片段的引用,當 slice 長度增加的時候,可能底層的陣列會被換掉。當在換底層陣列之前,切片同時被多個 goroutine 拿到,並執行 append 操作。那麼很多 goroutine 的 append 結果會被覆蓋,導致 n 個 gouroutine append 後,長度小於n,互斥鎖解決併發訪問slice的場景:func main() { slc := make([]int, 0, 1000) var wg sync.WaitGroup var lock sync.Mutex for i := 0; i < 1000; i++ { wg.Add(1) go func(a int) { defer wg.Done() // 加鎖 lock.Lock() defer lock.Unlock() slc = append(slc, a) }(i) } wg.Wait() fmt.Println(len(slc)) }
缺點:分散式部署環境下鎖會失效
2. mysql資料庫實現鎖
方案一:使用資料庫的唯一性來實現資源鎖定,比如主鍵和唯一索引等;建立一個欄位為唯一索引,加入一條資料即表示加鎖了,刪除這條資料就解鎖了;具體,使用很簡單,具體實現就不再闡述 資料庫中,為了實現高併發的資料訪問,對資料進行多版本處理,並通過事務的可見性來保證事務能看到自己應該看到的資料版本, 方案二:select for update解決併發資料查詢更新的問題SET AUTOCOMMIT=0; BEGIN WORK; SELECT category_id FROM blog_article WHERE id=3 FOR UPDATE; UPDATE blog_article SET category_id = 3; # 在commit前其它事物無法對此行資料進行修改 COMMIT WORK;
UPDATE blog_article SET category_id = 2 WHERE id = 3;
會發現事物無法立即執行,會等待for update那條事物commit,如果此時長時間未commit則會超時:
[SQL]UPDATE blog_article SET category_id = 2 WHERE id = 3; [Err] 1205 - Lock wait timeout exceeded; try restarting transaction
缺點:單機mysql負載能力有限,mysql鎖效能低下,select for update加鎖如果where條件後的欄位非主鍵則"表鎖",如果是主鍵則為"行鎖"
3. zookeeper、etcd實現分散式鎖
zookeeper實現分散式鎖:
利用 ZooKeeper 支援臨時順序節點的特性,可以實現分散式鎖;當客戶端對某個方法加鎖時,在 ZooKeeper 中該方法對應的指定節點目錄下,生成一個唯一的臨時有序節點。 ZooKeeper 實現分散式鎖的演算法流程,根節點為 /lock: 客戶端連線 ZooKeeper,並在 /lock 下建立臨時有序子節點,第一個客戶端對應的子節點為 /lock/lock01/00000001,第二個為 /lock/lock01/00000002; 其他客戶端獲取 /lock01 下的子節點列表,判斷自己建立的子節點是否為當前列表中序號最小的子節點; 如果是則認為獲得鎖,執行業務程式碼,否則通過 watch 事件監聽 /lock01 的子節點變更訊息,獲得變更通知後重復此步驟直至獲得鎖; 完成業務流程後,刪除對應的子節點,釋放分散式鎖。
// 建立zookeeper連線,並建立永久父級節點 func NewZkConn(address, parentPath string) *zk.Conn { hosts := []string{address} conn, _, err := zk.Connect(hosts, time.Second*5) if err != nil { panic(err) } ok, _, _ := conn.Exists(parentPath) if !ok { // 建立永久節點 nodeName, err := conn.Create(parentPath, nil, zk.FlagSequence, acls) if err != nil { panic(err) } fmt.Println("create node name :", nodeName) } return conn } // nodeCreateSuccess 當前節點是否已成功建立 func nodeCreateSuccess(conn *zk.Conn, path string, id int) bool { ok, _, ch, err := conn.ExistsW(path) if err != nil { return false } ex := false // 節點已存在,則監聽狀態變化 if ok { for { select { case c := <-ch: { if c.Type == zk.EventNodeDeleted { ex = true break } } } if ex { break } } } // 節點不存在則嘗試建立 _, err = conn.Create(path, nil, flags, acls) if err != nil { return false } fmt.Printf("[%s] 創造節點的id為 [%d] \n", path, id) return true } func main() { conn := NewZkConn(zkHosts, parentPath) // 假設臨時節點 path := parentPath + "/001_test_zookeeper_lock" for i := 0; i < 10; i++ { go func(conn *zk.Conn, path string, id int) { // 節點未建立成功則阻塞等待 for { ok := nodeCreateSuccess(conn, path, id) // ok=true表示當前節點已成功建立 if ok { // 釋放當前節點鎖 err := conn.Delete(path, 0) if err != nil { fmt.Println(err) } fmt.Printf("刪除成功 id為[%d] \n", id) break } } }(conn, path, i) } time.Sleep(time.Second * 10) }etcd實現分散式鎖: etcd有個很重要的特性,它的key value是多版本的,當有了一個值之後,再put時它的版本是不斷地往上加的,這裡跟zookeeper類似,判斷是否是最小的版本
- 利用租約在etcd叢集中建立多個key,這個key有兩種形態,存在和不存在,而這兩種形態就是互斥量。
- 通過Prefix字首機制獲取字首目錄下所有KV及Revision,通過Revision機制判斷當前執行緒是否能獲取到鎖。
- 通過Watch監聽機制來監聽前一個Revision的刪除事件。
func main() { var ( config clientv3.Config client *clientv3.Client lease clientv3.Lease leaseResp *clientv3.LeaseGrantResponse leaseId clientv3.LeaseID leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse err error ) //客戶端配置 config = clientv3.Config{ Endpoints: []string{"etcd2.sndu.cn:2379"}, DialTimeout: 5 * time.Second, } //建立連線 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } //上鎖(建立租約,自動續租) lease = clientv3.NewLease(client) //設定1個ctx取消自動續租 執行cancleFunc即執行cancel操作 ctx, cancleFunc := context.WithCancel(context.TODO()) //設定10秒租約(過期時間) if leaseResp, err = lease.Grant(context.TODO(), 10); err != nil { fmt.Println(err) return } //拿到租約id leaseId = leaseResp.ID //自動續租(不停地往管道中扔租約資訊) if leaseRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) } //啟動多個協程去監聽 go listenLeaseChan(leaseRespChan) //業務處理 kv := clientv3.NewKV(client) //建立事務 txn := kv.Txn(context.TODO()) txn.If(clientv3.Compare(clientv3.CreateRevision("/lock/20201029-etcd"), "=", 0)). Then(clientv3.OpPut("/lock/20201029-etcd", "true", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/lock/20201029-etcd")) //否則搶鎖失敗 //提交事務 if txtResp, err := txn.Commit(); err != nil { fmt.Println(err) return } else { //判斷是否搶鎖 if !txtResp.Succeeded { fmt.Println("鎖被佔用:", string(txtResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } } fmt.Println("處理任務") //釋放鎖(停止續租,終止租約) defer cancleFunc() //函式退出取消自動續租 defer lease.Revoke(context.TODO(), leaseId) //終止租約(去掉過期時間) time.Sleep(10 * time.Second) } // listenLeaseChan 監聽租約情況 func listenLeaseChan(leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse) { var leaseKeepResp *clientv3.LeaseKeepAliveResponse for { select { case leaseKeepResp = <-leaseRespChan: if leaseKeepResp == nil { fmt.Println("租約失效了") goto END } else { fmt.Println(leaseKeepResp.ID) } } } END: }
在etcd官方的實現中其實已經實現了分散式鎖,具體實現程式碼在https://github.com/etcd-io/etcd/blob/master/client/v3/concurrency/mutex.go目錄下:
// TryLock 嘗試加鎖 比較revision是否為最小版本 func (m *Mutex) TryLock(ctx context.Context) error { resp, err := m.tryAcquire(ctx) if err != nil { return err } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } client := m.s.Client() // Cannot lock, so delete the key if _, err := client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "\x00" m.myRev = -1 return ErrLocked } // Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { resp, err := m.tryAcquire(ctx) if err != nil { return err } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } client := m.s.Client() // wait for deletion revisions prior to myKey // TODO: early termination if the session key is deleted before other session keys with smaller revisions. _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) return werr } // make sure the session is not expired, and the owner key still exists. gresp, werr := client.Get(ctx, m.myKey) if werr != nil { m.Unlock(client.Ctx()) return werr } if len(gresp.Kvs) == 0 { // is the session key lost? return ErrSessionExpired } m.hdr = gresp.Header return nil } // tryAcquire 嘗試釋放鎖 func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { s := m.s client := m.s.Client() m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return nil, err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } return resp, nil } // Unlock 釋放鎖 刪除節點資訊 func (m *Mutex) Unlock(ctx context.Context) error { client := m.s.Client() if _, err := client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "\x00" m.myRev = -1 return nil }
4. redis實現分散式鎖
Redis分散式鎖控制併發主要是通過在Redis裡面建立一個key,當其它程序準備佔用的時候只能等待key釋放再佔用。Redis裡面有一個原子性指令setnx,當key存在時,它返回0,表示當前已有程序佔用,當它返回1時可以執行業務邏輯,此時沒有程序佔用,等邏輯執行完後,可以刪除key釋放鎖,這樣可以簡單的控制併發:
127.0.0.1:6379> setnx distributedKey aaa (integer) 1 127.0.0.1:6379> setnx distributedKey aaa (integer) 0 127.0.0.1:6379> get distributedKey "aaa" 127.0.0.1:6379>
在業務邏輯執行的過程中如果發生異常,此時key並沒有刪除,這樣就會造成死鎖,死鎖帶來的後果想必大家都很清楚。為了解決這個問題,可以在setnx加鎖後設置key的過期時間,當key到期自動刪除:
127.0.0.1:6379> expire distributedKey 5 (integer) 1 127.0.0.1:6379>
如果在執行setnx後,執行expire前Redis發生宕機了,這樣就不會執行expire,也會造成死鎖。由於setnx與expire是兩條命令,並且expire依賴setnx的執行結果,為了解決這個問題可以使用set key value [expiration EX seconds|PX milliseconds] [NX|XX] ,這是一條原子性的指令,同時包含setnx和expire:
127.0.0.1:6379> set distributedKey aaa ex 5 nx OK 127.0.0.1:6379> set distributedKey aaa ex 5 nx (nil)
key存在時執行會返回nil,只有key過期或不存在時才會返回ok
// DistributedLock 併發鎖 func DistributedLock(key string, expire int, c redis.Conn, value time.Time) (bool, error) { // 設定原子鎖 defer c.Close() exists, err := c.Do("set", key, value, "nx", "ex", expire) if err != nil { return false, errors.New("執行 set nx ex 失敗") } // 鎖已存在,已被佔用 if exists != nil { return false, nil } return true, nil } // ReleaseLock 釋放鎖 func ReleaseLock(c redis.Conn, key string) (bool, error) { defer c.Close() v, err := redis.Bool(c.Do("DEL", key)) return v, err }
呼叫:
func DoSomething(c redis.Conn, key string, expire int, value time.Time) { // 獲取鎖 defer c.Close() canUse, err := DistributedLock(key, expire, c, value) if err != nil { panic(err) } // 佔用鎖 if canUse { fmt.Println("start do something ...") // 釋放鎖 _, err := ReleaseLock(c, key) if err != nil { panic(err) } } return }
redis釋放鎖的問題:
如上圖所示,執行緒A先獲得鎖,執行超時鎖自動釋放,此時執行緒B獲取鎖開始執行,A執行完後釋放了B所持有的鎖,這時B繼續執行,並且執行緒C能獲取鎖,同一時刻執行緒A和B同時執行鎖,違背了分散式鎖的安全性。
5. redis+lua實現原子性釋放分散式鎖
定義lua指令碼釋放鎖:
const ( // ScriptDeleteLock 釋放redis併發鎖 lua指令碼 判斷value為本次鎖的value才釋放 ScriptDeleteLock = ` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end ` ) // ReleaseLockWithLua 釋放鎖 使用lua指令碼執行 func ReleaseLockWithLua(c redis.Conn, key string, value time.Time) (int, error) { // keyCount表示lua指令碼中key的個數 defer c.Close() lua := redis.NewScript(1, ScriptDeleteLock) // lua指令碼中的引數為key和value res, err := redis.Int(lua.Do(c, key, value)) if err != nil { return 0, err } return res, nil }
呼叫:
func DoSomethingWithLua(c redis.Conn, key string, expire int, value time.Time) { // 獲取鎖 defer c.Close() canUse, err := DistributedLock(key, expire, c, value) if err != nil { panic(err) } // 佔用鎖 if canUse { fmt.Println("start do something ...") // 釋放鎖 lua指令碼執行原子性刪除 _, err := ReleaseLockWithLua(c, key, value) if err != nil { panic(err) } } return }
redis sentinel叢集下鎖的同步問題:
如上圖所示,執行緒在master成功建立鎖,此時鎖還未同步到slave,master發生宕機,當slave1成我新master後鎖丟失。
6. redlock演算法及相關問題
redlock演算法流程
如上圖所示,redlock演算法的實現流程,每次加鎖的時候嘗試向redis叢集中每個節點申請加鎖,當前節點加鎖失敗則跳過繼續向下一個節點執行加鎖請求,只有大於一半的節點加鎖成功才認為分散式鎖成功;釋放鎖時同樣需配合lua指令碼向所有的redis節點發起釋放鎖請求。
redlock演算法跳躍時鐘問題
上述redlock演算法已經解決了redis叢集中master宕機導致鎖失效的問題,但是它是否就是完美的呢?如上圖所示,client1向redis叢集申請加鎖,此時節點A、B、C執行成功,client1成功獲取鎖,節點D和E由於網路原因加鎖失敗;這時節點C所在的伺服器由於時鐘向前跳躍導致鎖快速過期了,client2執行加鎖請求,顯然此時是能加鎖成功的;那麼相當於在同一時刻兩個程序能持有鎖,這顯然違背了分散式鎖的互斥性的特點。
redlock演算法GC停頓問題
同樣,還有一種特殊的情況就是GC停頓導致訊息延遲的問題,當client1向redis叢集發起加鎖請求並返回加鎖成功的結果,此時訊息延遲到達client1導致在這段時間redis叢集中的鎖過期了,client2顯然能夠正常獲取鎖,當GC恢復時client1收到結果會認為自己持有鎖,這同樣違背了分散式鎖互斥性的特點。
7. 總結
以上幾種鎖的實現方式並非說明哪種是最優解,具體場景需選擇具體的鎖。如果是單機環境建議直接使用語言層面的鎖來實現,這樣不需要引入額外的第三方依賴;如果是對資料庫的併發更新操作,並且併發量不是太大,可以使用mysql的select for update或者select for update nowait實現,但是注意儘量不要使用表鎖並且不要造成死鎖的問題;如果是對鎖的可靠性要求極高那麼建議使用zookeeper、etcd實現;最後如果在開發環境中沒有zookeeper、etcd等第三方元件,並且對鎖的效能要求比較高,可以使用單機的redis配合lua指令碼釋放鎖,這裡我個人並不推薦使用redloc