java分散式鎖的實現方式
原文出自:https://blog.csdn.net/seesun2012
### 什麼是鎖?
- 在單程序的系統中,當存在多個執行緒可以同時改變某個變數(可變共享變數)時,就需要對變數或程式碼塊做同步,使其在修改這種變數時能夠線性執行消除併發修改變數。
- 而同步的本質是通過鎖來實現的。為了實現多個執行緒在一個時刻同一個程式碼塊只能有一個執行緒可執行,那麼需要在某個地方做個標記,這個標記必須每個執行緒都能看到,當標記不存在時可以設定該標記,其餘後續執行緒發現已經有標記了則等待擁有標記的執行緒結束同步程式碼塊取消標記後再去嘗試設定標記。這個標記可以理解為鎖。
- 不同地方實現鎖的方式也不一樣,只要能滿足所有執行緒都能看得到標記即可。如 Java 中 synchronize 是在物件頭設定標記,Lock 介面的實現類基本上都只是某一個 volitile 修飾的 int 型變數其保證每個執行緒都能擁有對該 int 的可見性和原子修改,linux 核心中也是利用互斥量或訊號量等記憶體資料做標記。
- 除了利用記憶體資料做鎖其實任何互斥的都能做鎖(只考慮互斥情況),如流水錶中流水號與時間結合做冪等校驗可以看作是一個不會釋放的鎖,或者使用某個檔案是否存在作為鎖等。只需要滿足在對標記進行修改能保證原子性和記憶體可見性即可。
### 什麼是分散式?
分散式的 CAP 理論告訴我們:
任何一個分散式系統都無法同時滿足一致性(Consistency)、可用性(Availability)和分割槽容錯性(Partition tolerance),最多隻能同時滿足兩項。
目前很多大型網站及應用都是分散式部署的,分散式場景中的資料一致性問題一直是一個比較重要的話題。基於 CAP理論,很多系統在設計之初就要對這三者做出取捨。在網際網路領域的絕大多數的場景中,都需要犧牲強一致性來換取系統的高可用性,系統往往只需要保證最終一致性。
分散式場景
此處主要指叢集模式下,多個相同服務同時開啟.
在許多的場景中,我們為了保證資料的最終一致性,需要很多的技術方案來支援,比如分散式事務
、分散式鎖
等。很多時候我們需要保證一個方法在同一時間內只能被同一個執行緒執行。在單機環境中,通過 Java 提供的併發 API 我們可以解決,但是在分散式環境下,就沒有那麼簡單啦。
- 分散式與單機情況下最大的不同在於其不是多執行緒而是
多程序
。 - 多執行緒由於可以共享堆記憶體,因此可以簡單的採取記憶體作為標記儲存位置。而程序之間甚至可能都不在同一臺物理機上,因此需要將標記儲存在一個所有程序都能看到的地方。
-
什麼是分散式鎖?
- 當在分散式模型下,資料只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改資料的程序數。
- 與單機模式下的鎖不僅需要保證程序可見,還需要考慮程序與鎖之間的網路問題。(我覺得分散式情況下之所以問題變得複雜,主要就是需要考慮到網路的延時和不可靠。。。一個大坑)
- 分散式鎖還是可以將標記存在記憶體,只是該記憶體不是某個程序分配的記憶體而是公共記憶體如 Redis、Memcache。至於利用資料庫、檔案等做鎖與單機的實現是一樣的,只要保證標記能互斥就行。
### 我們需要怎樣的分散式鎖?
- 可以保證在分散式部署的應用叢集中,同一個方法在同一時間只能被一臺機器-上的一個執行緒執行。
- 這把鎖要是一把可重入鎖(避免死鎖)
- 這把鎖最好是一把阻塞鎖(根據業務需求考慮要不要這條)
- 這把鎖最好是一把公平鎖(根據業務需求考慮要不要這條)
- 有高可用的獲取鎖和釋放鎖功能
- 獲取鎖和釋放鎖的效能要好
### 基於資料庫做分散式鎖
基於樂觀鎖
基於表主鍵唯一做分散式鎖
思路:利用主鍵唯一的特性,如果有多個請求同時提交到資料庫的話,資料庫會保證只有一個操作可以成功,那麼我們就可以認為操作成功的那個執行緒獲得了該方法的鎖,當方法執行完畢之後,想要釋放鎖的話,刪除這條資料庫記錄即可。
上面這種簡單的實現有以下幾個問題:
- 這把鎖強依賴資料庫的可用性,資料庫是一個單點,一旦資料庫掛掉,會導致業務系統不可用。
- 這把鎖沒有失效時間,一旦解鎖操作失敗,就會導致鎖記錄一直在資料庫中,其他執行緒無法再獲得到鎖。
- 這把鎖只能是非阻塞的,因為資料的 insert操作,一旦插入失敗就會直接報錯。沒有獲得鎖的執行緒並不會進入排隊佇列,要想再次獲得鎖就要再次觸發獲得鎖操作。
- 這把鎖是非重入的,同一個執行緒在沒有釋放鎖之前無法再次獲得該鎖。因為資料中資料已經存在了。
- 這把鎖是非公平鎖,所有等待鎖的執行緒憑運氣去爭奪鎖。
- 在 MySQL 資料庫中採用主鍵衝突防重,在大併發情況下有可能會造成鎖表現象。
當然,我們也可以有其他方式解決上面的問題。
- 資料庫是單點?搞兩個資料庫,資料之前雙向同步,一旦掛掉快速切換到備庫上。
- 沒有失效時間?只要做一個定時任務,每隔一定時間把資料庫中的超時資料清理一遍。
- 非阻塞的?搞一個 while 迴圈,直到 insert 成功再返回成功。
- 非重入的?在資料庫表中加個欄位,記錄當前獲得鎖的機器的主機資訊和執行緒資訊,那麼下次再獲取鎖的時候先查詢資料庫,如果當前機器的主機資訊和執行緒資訊在資料庫可以查到的話,直接把鎖分配給他就可以了。
- 非公平的?再建一張中間表,將等待鎖的執行緒全記錄下來,並根據建立時間排序,只有最先建立的允許獲取鎖。
- 比較好的辦法是在程式中生產主鍵進行防重。
##### 基於表字段版本號做分散式鎖
這個策略源於 mysql 的 mvcc 機制,使用這個策略其實本身沒有什麼問題,唯一的問題就是對資料表侵入較大,我們要為每個表設計一個版本號欄位,然後寫一條判斷 sql 每次進行判斷,增加了資料庫操作的次數,在高併發的要求下,對資料庫連線的開銷也是無法忍受的。
基於悲觀鎖
##### 基於資料庫排他鎖做分散式鎖
在查詢語句後面增加for update
,資料庫會在查詢過程中給資料庫表增加排他鎖 (注意: InnoDB 引擎在加鎖的時候,只有通過索引進行檢索的時候才會使用行級鎖,否則會使用表級鎖。這裡我們希望使用行級鎖,就要給要執行的方法欄位名新增索引,值得注意的是,這個索引一定要建立成唯一索引,否則會出現多個過載方法之間無法同時被訪問的問題。過載方法的話建議把引數型別也加上。)。當某條記錄被加上排他鎖之後,其他執行緒無法再在該行記錄上增加排他鎖。
我們可以認為獲得排他鎖的執行緒即可獲得分散式鎖,當獲取到鎖之後,可以執行方法的業務邏輯,執行完方法之後,通過connection.commit()
操作來釋放鎖。
這種方法可以有效的解決上面提到的無法釋放鎖和阻塞鎖的問題。
- 阻塞鎖?
for update
語句會在執行成功後立即返回,在執行失敗時一直處於阻塞狀態,直到成功。 - 鎖定之後服務宕機,無法釋放?使用這種方式,服務宕機之後資料庫會自己把鎖釋放掉。
但是還是無法直接解決資料庫單點和可重入問題。
這裡還可能存在另外一個問題,雖然我們對方法欄位名使用了唯一索引,並且顯示使用 for update 來使用行級鎖。但是,MySQL 會對查詢進行優化,即便在條件中使用了索引欄位,但是否使用索引來檢索資料是由 MySQL 通過判斷不同執行計劃的代價來決定的,如果 MySQL 認為全表掃效率更高,比如對一些很小的表,它就不會使用索引,這種情況下 InnoDB 將使用表鎖,而不是行鎖。如果發生這種情況就悲劇了。。。
還有一個問題,就是我們要使用排他鎖來進行分散式鎖的 lock,那麼一個排他鎖長時間不提交,就會佔用資料庫連線。一旦類似的連線變得多了,就可能把資料庫連線池撐爆。
##### 優缺點
優點:簡單,易於理解缺點:會有各種各樣的問題(操作資料庫需要一定的開銷,使用資料庫的行級鎖並不一定靠譜,效能不靠譜)
### 基於 Redis 做分散式鎖
基於 REDIS 的 SETNX()、EXPIRE() 方法做分散式鎖
setnx()
setnx 的含義就是 SET if Not Exists,其主要有兩個引數 setnx(key, value)。該方法是原子的,如果 key 不存在,則設定當前 key 成功,返回 1;如果當前 key 已經存在,則設定當前 key 失敗,返回 0。
expire()
expire 設定過期時間,要注意的是 setnx 命令不能設定 key 的超時時間,只能通過 expire() 來對 key 設定。
使用步驟
1、setnx(lockkey, 1) 如果返回 0,則說明佔位失敗;如果返回 1,則說明佔位成功
2、expire() 命令對 lockkey 設定超時時間,為的是避免死鎖問題。
3、執行完業務程式碼後,可以通過 delete 命令刪除 key。
這個方案其實是可以解決日常工作中的需求的,但從技術方案的探討上來說,可能還有一些可以完善的地方。比如,如果在第一步 setnx 執行成功後,在 expire() 命令執行成功前,發生了宕機的現象,那麼就依然會出現死鎖的問題,所以如果要對其進行完善的話,可以使用 redis 的 setnx()、get() 和 getset() 方法來實現分散式鎖。
基於 REDIS 的 SETNX()、GET()、GETSET()方法做分散式鎖
這個方案的背景主要是在 setnx() 和 expire() 的方案上針對可能存在的死鎖問題,做了一些優化。
getset()
這個命令主要有兩個引數 getset(key,newValue)。該方法是原子的,對 key 設定 newValue 這個值,並且返回 key 原來的舊值。假設 key 原來是不存在的,那麼多次執行這個命令,會出現下邊的效果:
- getset(key, “value1”) 返回 null 此時 key 的值會被設定為 value1
- getset(key, “value2”) 返回 value1 此時 key 的值會被設定為 value2
- 依次類推!
使用步驟
- setnx(lockkey, 當前時間+過期超時時間),如果返回 1,則獲取鎖成功;如果返回 0 則沒有獲取到鎖,轉向 2。
- get(lockkey) 獲取值 oldExpireTime ,並將這個 value 值與當前的系統時間進行比較,如果小於當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向 3。
- 計算 newExpireTime = 當前時間+過期超時時間,然後 getset(lockkey, newExpireTime) 會返回當前 lockkey 的值currentExpireTime。
- 判斷 currentExpireTime 與 oldExpireTime 是否相等,如果相等,說明當前 getset 設定成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那麼當前請求可以直接返回失敗,或者繼續重試。
- 在獲取到鎖之後,當前執行緒可以開始自己的業務處理,當處理完畢後,比較自己的處理時間和對於鎖設定的超時時間,如果小於鎖設定的超時時間,則直接執行 delete 釋放鎖;如果大於鎖設定的超時時間,則不需要再鎖進行處理。
import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;
//redis分散式鎖
public final class RedisLockUtil {
private static final int defaultExpire = 60;
private RedisLockUtil() {
//
}
/**
* 加鎖
* @param key redis key
* @param expire 過期時間,單位秒
* @return true:加鎖成功,false,加鎖失敗
*/
public static boolean lock(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
long status = redisService.setnx(key, "1");
if(status == 1) {
redisService.expire(key, expire);
return true;
}
return false;
}
public static boolean lock(String key) {
return lock2(key, defaultExpire);
}
/**
* 加鎖
* @param key redis key
* @param expire 過期時間,單位秒
* @return true:加鎖成功,false,加鎖失敗
*/
public static boolean lock2(String key, int expire) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
long value = System.currentTimeMillis() + expire;
long status = redisService.setnx(key, String.valueOf(value));
if(status == 1) {
return true;
}
long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
if(oldExpireTime < System.currentTimeMillis()) {
//超時
long newExpireTime = System.currentTimeMillis() + expire;
long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
if(currentExpireTime == oldExpireTime) {
return true;
}
}
return false;
}
public static void unLock1(String key) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
redisService.del(key);
}
public static void unLock2(String key) {
RedisService redisService = SpringUtils.getBean(RedisService.class);
long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
if(oldExpireTime > System.currentTimeMillis()) {
redisService.del(key);
}
}
}
public void drawRedPacket(long userId) {
String key = "draw.redpacket.userid:" + userId;
boolean lock = RedisLockUtil.lock2(key, 60);
if(lock) {
try {
//領取操作
} finally {
//釋放鎖
RedisLockUtil.unLock(key);
}
} else {
new RuntimeException("重複領取獎勵");
}
}
基於 REDLOCK 做分散式鎖
Redlock 是 Redis 的作者 antirez 給出的叢集模式的 Redis 分散式鎖,它基於 N 個完全獨立的 Redis 節點(通常情況下 N 可以設定成 5)。
演算法的步驟如下:
- 1、客戶端獲取當前時間,以毫秒為單位。
- 2、客戶端嘗試獲取 N 個節點的鎖,(每個節點獲取鎖的方式和前面說的快取鎖一樣),N 個節點以相同的 key 和 value 獲取鎖。客戶端需要設定介面訪問超時,介面超時時間需要遠遠小於鎖超時時間,比如鎖自動釋放的時間是 10s,那麼介面超時大概設定 5-50ms。這樣可以在有 redis 節點宕機後,訪問該節點時能儘快超時,而減小鎖的正常使用。
- 3、客戶端計算在獲得鎖的時候花費了多少時間,方法是用當前時間減去在步驟一獲取的時間,只有客戶端獲得了超過 3 個節點的鎖,而且獲取鎖的時間小於鎖的超時時間,客戶端才獲得了分散式鎖。
- 4、客戶端獲取的鎖的時間為設定的鎖超時時間減去步驟三計算出的獲取鎖花費時間。
- 5、如果客戶端獲取鎖失敗了,客戶端會依次刪除所有的鎖。
使用 Redlock 演算法,可以保證在掛掉最多 2 個節點的時候,分散式鎖服務仍然能工作,這相比之前的資料庫鎖和快取鎖大大提高了可用性,由於 redis 的高效效能,分散式快取鎖效能並不比資料庫鎖差。
但是,有一位分散式的專家寫了一篇文章《How to do distributed locking》,質疑 Redlock 的正確性。
https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA
https://blog.csdn.net/jek123456/article/details/72954106
優缺點
優點: 效能高
缺點:
失效時間設定多長時間為好?如何設定的失效時間太短,方法沒等執行完,鎖就自動釋放了,那麼就會產生併發問題。如果設定的時間太長,其他獲取鎖的執行緒就可能要平白的多等一段時間。
基於 REDISSON 做分散式鎖
redisson 是 redis 官方的分散式鎖元件。GitHub 地址:https://github.com/redisson/redisson
上面的這個問題 ——> 失效時間設定多長時間為好?這個問題在 redisson 的做法是:每獲得一個鎖時,只設置一個很短的超時時間,同時起一個執行緒在每次快要到超時時間時去重新整理鎖的超時時間。在釋放鎖的同時結束這個執行緒。
基於 ZooKeeper 做分散式鎖
ZOOKEEPER 鎖相關基礎知識
- zk 一般由多個節點構成(單數),採用 zab 一致性協議。因此可以將 zk 看成一個單點結構,對其修改資料其內部自動將所有節點資料進行修改而後才提供查詢服務。
- zk 的資料以目錄樹的形式,每個目錄稱為 znode, znode 中可儲存資料(一般不超過 1M),還可以在其中增加子節點。
- 子節點有三種類型。序列化節點,每在該節點下增加一個節點自動給該節點的名稱上自增。臨時節點,一旦建立這個 znode 的客戶端與伺服器失去聯絡,這個 znode 也將自動刪除。最後就是普通節點。
- Watch 機制,client 可以監控每個節點的變化,當產生變化會給 client 產生一個事件。
ZK 基本鎖
- 原理:利用臨時節點與 watch 機制。每個鎖佔用一個普通節點 /lock,當需要獲取鎖時在 /lock 目錄下建立一個臨時節點,建立成功則表示獲取鎖成功,失敗則 watch/lock 節點,有刪除操作後再去爭鎖。臨時節點好處在於當程序掛掉後能自動上鎖的節點自動刪除即取消鎖。
- 缺點:所有取鎖失敗的程序都監聽父節點,很容易發生羊群效應,即當釋放鎖後所有等待程序一起來建立節點,併發量很大。
ZK 鎖優化
- 原理:上鎖改為建立臨時有序節點,每個上鎖的節點均能建立節點成功,只是其序號不同。只有序號最小的可以擁有鎖,如果這個節點序號不是最小的則 watch 序號比本身小的前一個節點 (公平鎖)。
步驟:
- 1.在 /lock 節點下建立一個有序臨時節點 (EPHEMERAL_SEQUENTIAL)。
- 2.判斷建立的節點序號是否最小,如果是最小則獲取鎖成功。不是則取鎖失敗,然後 watch 序號比本身小的前一個節點。
- 3.當取鎖失敗,設定 watch 後則等待 watch 事件到來後,再次判斷是否序號最小。
- 4.取鎖成功則執行程式碼,最後釋放鎖(刪除該節點)。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class DistributedLock implements Lock, Watcher{
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//競爭資源的標誌
private String waitNode;//等待前一個鎖
private String myZnode;//當前鎖
private CountDownLatch latch;//計數器
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();
/**
* 建立分散式鎖,使用前請確認config配置的zookeeper服務可用
* @param config 127.0.0.1:2181
* @param lockName 競爭資源標誌,lockName中不能包含單詞lock
*/
public DistributedLock(String config, String lockName){
this.lockName = lockName;
// 建立一個與伺服器的連線
try {
zk = new ZooKeeper(config, sessionTimeout, this);
Stat stat = zk.exists(root, false);
if(stat == null){
// 建立根節點
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}
} catch (IOException e) {
exception.add(e);
} catch (KeeperException e) {
exception.add(e);
} catch (InterruptedException e) {
exception.add(e);
}
}
/**
* zookeeper節點的監視器
*/
public void process(WatchedEvent event) {
if(this.latch != null) {
this.latch.countDown();
}
}
public void lock() {
if(exception.size() > 0){
throw new LockException(exception.get(0));
}
try {
if(this.tryLock()){
System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
return;
}
else{
waitForLock(waitNode, sessionTimeout);//等待鎖
}
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
}
public boolean tryLock() {
try {
String splitStr = "_lock_";
if(lockName.contains(splitStr))
throw new LockException("lockName can not contains \\u000B");
//建立臨時子節點
myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(myZnode + " is created ");
//取出所有子節點
List<String> subNodes = zk.getChildren(root, false);
//取出所有lockName的鎖
List<String> lockObjNodes = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if(_node.equals(lockName)){
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
System.out.println(myZnode + "==" + lockObjNodes.get(0));
if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
//如果是最小的節點,則表示取得鎖
return true;
}
//如果不是最小的節點,找到比自己小1的節點
String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
} catch (KeeperException e) {
throw new LockException(e);
} catch (InterruptedException e) {
throw new LockException(e);
}
return false;
}
public boolean tryLock(long time, TimeUnit unit) {
try {
if(this.tryLock()){
return true;
}
return waitForLock(waitNode,time);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
//判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時註冊監聽
if(stat != null){
System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
this.latch = new CountDownLatch(1);
this.latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
public void unlock() {
try {
System.out.println("unlock " + myZnode);
zk.delete(myZnode,-1);
myZnode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
public void lockInterruptibly() throws InterruptedException {
this.lock();
}
public Condition newCondition() {
return null;
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e){
super(e);
}
public LockException(Exception e){
super(e);
}
}
}
優缺點
優點:
有效的解決單點問題,不可重入問題,非阻塞問題以及鎖無法釋放的問題。實現起來較為簡單。
缺點:
效能上可能並沒有快取服務那麼高,因為每次在建立鎖和釋放鎖的過程中,都要動態建立、銷燬臨時節點來實現鎖功能。ZK 中建立和刪除節點只能通過 Leader 伺服器來執行,然後將資料同步到所有的 Follower 機器上。還需要對 ZK的原理有所瞭解。
基於 Consul 做分散式鎖
DD 寫過類似文章,其實主要利用 Consul 的 Key / Value 儲存 API 中的 acquire 和 release 操作來實現。
文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/
使用分散式鎖的注意事項
1、注意分散式鎖的開銷
2、注意加鎖的粒度
3、加鎖的方式
總結
無論你身處一個什麼樣的公司,最開始的工作可能都需要從最簡單的做起。不要提阿里和騰訊的業務場景 qps 如何大,因為在這樣的大場景中你未必能親自參與專案,親自參與專案未必能是核心的設計者,是核心的設計者未必能獨自設計。希望大家能根據自己公司業務場景,選擇適合自己專案的方案。