實戰 -- Zookeeper實現分散式鎖
場景分析
比如購買商品的操作,首先獲取商品的庫存,判斷庫存是否充足,充足的話建立訂單減庫存,不充足的話不允許建立訂單。
有一款新型膝上型電腦,庫存只剩下1件的時候,有客戶A、B從不同的客戶端(比如網站和APP)上看中了這款電腦,兩人同時進行下單操作。
A和B同時獲取庫存,A購買1件,庫存為1,判斷充足,建立訂單,庫存減1,B購買1件,庫存為1,判斷充足,建立訂單,庫存減1。
結果是僅剩1件的商品,被兩次下單,庫存變成了-1。顯然這種結果是錯誤的,A和B之間有一人的訂單無法發貨。
如何解決呢
可以用一個鎖,A在下單的時候,給庫存加上一個鎖,此時除了A以外,任何人都不能對庫存進行操作,B在獲取庫存的時候 ,由於A對庫存加上了鎖,所以B只好等待A釋放鎖之後在繼續。
A建立完訂單,對庫存減1,釋放鎖,B獲取鎖再繼續獲取庫存,此時庫存為0,判斷庫存不充足,無法建立訂單。保證了庫存僅剩1的商品只能被下單1次。
這是分散式鎖實現的一種方案。
什麼是分散式鎖
分散式鎖,是控制分散式系統或不同系統之間訪問共享資源的一種鎖的實現,其主要解決的問題就是保證資料一致性。
Zookeeper實現的分散式鎖,是利用節點的操作來進行的,加鎖,就是建立節點(臨時節點),解鎖就是刪除節點,同一個業務都是在同一父節點下進行加鎖和解鎖操作,如果該業務父節點下有子節點,則說明該業務已經被鎖住了,如果沒有子節點,則沒被加鎖。
臨時節點的特點是,當會話失效時,Zookeeper自動清除,避免獲取鎖的客戶端掉線後,沒有刪除鎖節點,而其他客戶端都在等這個鎖節點刪除,產生了死鎖。
實現分散式鎖的兩種方式
一、單節點鎖
在某一業務節點,只允許建立1個子節點,代表鎖,所有客戶端爭搶建立子節點許可權,搶到並建立,則加鎖成功,沒搶到,則等待機會。如圖所示:
- 客戶端準備加鎖的時候,檢視該業務節點下有沒有子節點,如果沒有,則建立節點,此客戶端獲得鎖,執行業務操作。
- 如果存在子節點,則代表當前業務被加鎖,此時客戶端掛起,監聽業務節點的子節點變化
- 客戶端獲取鎖並執行完業務之後,刪除該節點,Zookeeper通知其他客戶端,喚醒掛起,繼續嘗試建立節點。
二、多節點鎖
在單節點鎖中,所有客戶端都操作同一個節點,當只有鎖的客戶端釋放鎖時,其他的客戶端都從掛起狀態中喚醒,來競爭鎖。誰先獲取鎖與客戶端的網路狀態和Zookeeper的伺服器CPU排程等不可控因素有關,和誰先來後到的無關。
如果希望客戶端能按照先來後到的順序來獲取鎖,就需要用多節點鎖來實現,即每個客戶端在同一業務節點下建立專屬自己的順序節點,按照順序節點的序號,來決定誰獲取鎖。如圖:
- 某個客戶端嘗試加鎖時,先在該業務節點下,建立一個順序節點
- 建立完成後,獲取出該業務節點下的所有子節點,並按照按照節點序號排序
- 判斷第一位的節點是否為自己的節點,是的話,代表獲取鎖,執行業務操作
- 不是的話,對排在自己前一位的節點進行監聽,客戶端掛起
- 當客戶端執行業務完畢後,刪除自己的節點,並通知監聽自己節點的客戶端進行業務操作。
多節點程式碼實現
類設計如下
- BusinessTypeEnum列舉,定義了業務的型別,用來區分不同業務,如果要對某個業務加鎖,就在BusinessTypeEnum定義的業務型別下建立節點
- CuatorExt介面,操作Zookeeper的客戶端,定義了一些操作方法
- AbstractCuatorExt類,客戶端CuatorExt介面方法的實現,規範了客戶端基本結構
- BaseDistributedLock類,繼承了AbstractCuatorExt,分散式鎖實現的核心,規範了分散式鎖結構,對它的子類公開獲取鎖的方法。
- DistributedLock介面,分散式鎖對外公開的介面,提供獲取鎖和釋放鎖兩種功能。
- DistributedLockImpl類是對DistributedLock介面的實現
- BuyService類,業務類。
BusinessTypeEnum
public enum BusinessTypeEnum {
items("/items"),
orders("/orders");
private String value;
BusinessTypeEnum(String value){
this.value = value;
}
public String getValue() {
return value;
}
}
CuatorExt
public interface CuatorExt {
/**
* 建立臨時序列節點
* @param basePath
* @return
*/
public String createEphemeralSequential(String basePath) throws Exception;
/**
* 刪除節點
* @param ourPath
*/
public void delete(String ourPath) throws Exception;
/**
* 獲取子節點
* @param basePath
* @return
*/
public List<String> getChildren(String basePath) throws Exception;
}
BaseDistributedLock
public class BaseDistributedLock extends AbstractCuatorExt {
private static final String NAME_SPACE="lock_namespace";
private static final String DISTRIBUTED_LOCK = "/lock-";
BaseDistributedLock(CuratorFramework client) {
super(client);
}
private static final Integer MAX_RETRY_COUNT = 10;//重試次數
public void init(){
this.client = this.client.usingNamespace(NAME_SPACE);
for(BusinessTypeEnum b : BusinessTypeEnum.values()){
try {
if(this.client.checkExists().forPath(b.getValue())==null){
this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(b.getValue());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 獲取鎖,並設定超時時間
* @param time
* @param timeUnit
* @param businessType
* @return
* @throws Exception
*/
protected String attemptLock(long time, TimeUnit timeUnit, BusinessTypeEnum businessType) throws Exception {
boolean goDone = false;
String ourPath = null;
String lockName = null;
long startMillis = System.currentTimeMillis();
int count = 0;
while (!goDone) {
goDone = true;
try {
ourPath = createEphemeralSequential(businessType.getValue()+DISTRIBUTED_LOCK);
lockName = waitToLock(startMillis, time, timeUnit, businessType, ourPath);
} catch (Exception e) {
if (count++ < MAX_RETRY_COUNT) {
goDone = false;
} else {
throw e;
}
}
}
return lockName;
}
private String waitToLock(long startMillis, long time, TimeUnit timeUnit, BusinessTypeEnum businessType, String ourPath) throws Exception {
boolean haveLock = false;
String lockName = null;
Long waitMillis = timeUnit == null ? null : timeUnit.toMillis(time);
boolean doDelete = false;
try {
while (!haveLock) {
List<String> children = getChildrenAndSortThem(businessType.getValue());
int index = children.indexOf(ourPath.substring(( businessType.getValue() + "/").length()));
if (index < 0) {
throw new Exception("無此節點:" + ourPath);
}
if (index == 0) {
haveLock = true;
lockName = ourPath;
} else {
String frontPath = children.get(index-1);
CountDownLatch countDownLatch = new CountDownLatch(1);
getClient().getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
countDownLatch.countDown();
lg.info(frontPath + "完成");
}
}).forPath(businessType.getValue()+"/"+frontPath);
if(waitMillis!=null){
waitMillis = System.currentTimeMillis() - startMillis;
if(waitMillis>0){
lg.info(ourPath + "等待" + frontPath + "完成");
countDownLatch.await(waitMillis,timeUnit);
}else{
throw new Exception(ourPath+"等待超時");
}
}else{
lg.info(ourPath + "等待" + frontPath + "完成");
countDownLatch.await();
}
startMillis = System.currentTimeMillis();
}
}
} catch (Exception e) {
doDelete = true;
throw e;
}finally {
if(doDelete){
delete(ourPath);
}
}
return lockName;
}
private List<String> getChildrenAndSortThem(String basePath) {
List<String> children = null;
try {
children = getChildren(basePath);
Collections.sort(children, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return getLockNumber(o1, basePath.length()) - getLockNumber(o2, basePath.length());
}
});
} catch (Exception e) {
e.printStackTrace();
}
return children;
}
private int getLockNumber(String node, int suff) {
node = node.substring(suff);
return Integer.parseInt(node);
}
}
AbstractCuatorExt
public class AbstractCuatorExt implements CuatorExt {
final static Logger lg = LoggerFactory.getLogger(AbstractCuatorExt.class);
public CuratorFramework client;
AbstractCuatorExt(CuratorFramework client){
this.client = client;
}
public CuratorFramework getClient() {
return client;
}
@Override
public String createEphemeralSequential(String basePath) throws Exception {
String o = this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(basePath);
return o;
}
@Override
public void delete(String ourPath) throws Exception {
this.client.delete().forPath(ourPath);
}
@Override
public List<String> getChildren(String basePath) throws Exception {
List<String> children = this.client.getChildren().forPath(basePath);
return children;
}
}
DistributedLock
/**
* 分配鎖
*/
public interface DistributedLock {
/**獲取鎖,如果沒有得到就等待*/
public String acquire(BusinessTypeEnum businessType) throws Exception;
/**
* 獲取鎖,直到超時
* @param time 超時時間
* @param unit time引數的單位
* @return是否獲取到鎖
* @throws Exception
*/
public String acquire(BusinessTypeEnum businessType,long time, TimeUnit unit) throws Exception;
/**
* 釋放鎖
* @throws Exception
*/
public void release(String lockName) throws Exception;
}
DistributedLockImpl
public class DistributedLockImpl extends BaseDistributedLock implements DistributedLock {
DistributedLockImpl(CuratorFramework client) {
super(client);
}
@Override
public String acquire(BusinessTypeEnum businessType) throws Exception {
return attemptLock(0,null,businessType);
}
@Override
public String acquire(BusinessTypeEnum businessType, long time, TimeUnit unit) throws Exception {
return attemptLock(time,unit,businessType);
}
@Override
public void release(String lockName) throws Exception {
delete(lockName);
}
}
BuyService
@Service("byService")
public class ByServiceImpl implements BuyService {
static int i = 0;
Logger lg = LoggerFactory.getLogger(ByServiceImpl.class);
@Autowired
OrderService orderService;
@Autowired
ItemService itemService;
@Autowired
DistributedLock distributedLock;
@Override
public String getLock(String name) {
lg.info("開始獲取鎖");
String lockName = null;
try {
lockName = distributedLock.acquire(BusinessTypeEnum.items);
lg.info(lockName + "進行業務中:");
Thread.sleep(30000);
distributedLock.release(lockName);
} catch (Exception e) {
e.printStackTrace();
return null;
}
lg.info(lockName+"釋放完畢");
return lockName;
}
}
Spring配置
<bean id="distributedLock" class="xin.youhuila.shopdobbo.web.lock.impl.DistributedLock" init-method="init">
<constructor-arg ref="client"/>
</bean>
<bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
<constructor-arg index="0" value="10"/>
<constructor-arg index="1" value="5000"/>
</bean>
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
<constructor-arg index="0" value="localhost:2181"/>
<constructor-arg index="1" value="10000"/>
<constructor-arg index="2" value="5000"/>
<constructor-arg index="3" ref="retryPolicy"/>
</bean>
效果圖
單節點鎖程式碼實現
比較簡單,直接看程式碼吧
public class DistributedLock {
private CuratorFramework client = null;
final static Logger lg =LoggerFactory.getLogger(DistributedLockImooc.class);
private static CountDownLatch zkLocklatch = new CountDownLatch(1);
private static final String ZK_LOCK_PROJECT = "imooc-locks";
private static final String DISTRIBUTED_LOCK = "distributed_lock";
public DistributedLockImooc(CuratorFramework client) {
this.client = client;
}
private void init(){
client = client.usingNamespace("ZKLocks-Namespace");
try {
if(client.checkExists().forPath("/".concat(ZK_LOCK_PROJECT))==null){
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/"+ZK_LOCK_PROJECT);
}
addWatcherToLock("/"+ZK_LOCK_PROJECT);
} catch (Exception e) {
e.printStackTrace();
}
}
private void addWatcherToLock(String path) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client,path,true);
cache.start( PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
String path = event.getData().getPath();
lg.info("上一個會話已釋放鎖或斷開"+path);
lg.info("釋放計數器,讓當前請求來獲得分散式鎖");
zkLocklatch.countDown();
}
}
});
}
public boolean releaseLock(){
try {
if(client.checkExists().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK)!=null){
client.delete().forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
lg.info("釋放成功");
return true;
}
public void getLock(){
int i = 0;
while(true){
try {
client.create().creatingParentsIfNeeded().withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/"+ZK_LOCK_PROJECT+"/"+DISTRIBUTED_LOCK);
lg.info("獲取分散式鎖成功");
return;
} catch (Exception e) {
lg.info("獲取分散式鎖失敗");
try {
if(zkLocklatch.getCount()<=0){
zkLocklatch = new CountDownLatch(1);
}
zkLocklatch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
lg.info("第"+i+"嘗試此獲取鎖");
}
}
}
}