利用consul在spring boot中實現最簡單的分散式鎖
因為在專案實際過程中所採用的是微服務架構,考慮到承載量基本每個相同業務的服務都是多節點部署,所以針對某些資源的訪問就不得不用到用到分散式鎖了。
這裡列舉一個最簡單的場景,假如有一個智慧售貨機,由於機器本身的原因不能同一臺機器不能同時出兩個商品,這就要求在在出貨流程前針對同一臺機器在同一時刻出現併發
建立訂單時只能有一筆訂單建立成功,但是訂單服務是多節點部署的,所以就不得不用到分散式鎖了。
以上只是一種簡單的業務場景,在各種大型網際網路實際應用中,需要分散式鎖的業務場景會更多,綜合比較了業界基於各種中介軟體來實現的分散式鎖方案,然後結合實際業務最終
決定採用consul來實現,因為我們的專案中採用了consul做註冊中心,並且consul天生可以保證一致性(這點類似zk),當然zk也能實現分散式鎖,但是這裡不對這點做過多討論。
redis雖然也能實現分散式鎖,但是可能因為場景比較複雜,如果redis採用cluster部署的話,如果某一主節點出現故障的話,有一定機率會出現腦裂現象,這樣就可能會讓競爭者在
併發時同時獲得到鎖,這樣可能會破壞掉後面的業務,當然出現這種情況的概率很低,但是也不能完全排除,因為redis的根本不能保證強一致性導致的。
好了,這裡說的最簡單的分散式鎖的意思是,多個競爭者同一時間併發去獲得鎖時,獲取失敗的就直接返回了,獲取成功的繼續後續的流程,然後在合適的時間釋放鎖,並且為鎖
加了超時時間,防止獲得到鎖的程序或執行緒在未來得及釋放鎖時自己掛掉了,導致資源處於一直被鎖定的狀態無法得到釋放。主要的實現邏輯就是這樣,如果有人想實現獲得鎖失
敗的競爭者一直繼續嘗試獲得,可以基於該示例進行修改,加上自旋邏輯就OK。
以下是鎖實現程式碼:
1 package com.lyb.consullock; 2 3 import com.ecwid.consul.v1.ConsulClient; 4 import com.ecwid.consul.v1.agent.model.NewCheck; 5 import com.ecwid.consul.v1.kv.model.PutParams; 6 import com.ecwid.consul.v1.session.model.NewSession; 7 import com.ecwid.consul.v1.session.model.Session; 8 import lombok.Data; 9 10 11 import java.time.LocalDateTime; 12 import java.util.ArrayList; 13 import java.util.List; 14 15 16 public class DistributedLock{ 17 private ConsulClient consulClient; 18 19 /** 20 * 建構函式 21 * @param consulHost 註冊consul的client或服務端的Ip或主機名,或域名 22 * @param consulPort 埠號 23 */ 24 public DistributedLock(String consulHost,int consulPort){ 25 consulClient = new ConsulClient(consulHost,consulPort); 26 } 27 28 /** 29 * 獲得鎖的方法 30 * @param lockName 競爭的資源名 31 * @param ttlSeconds 鎖的超時時間,超過該時間自動釋放 32 * @return 33 */ 34 public LockContext getLock(String lockName,int ttlSeconds){ 35 LockContext lockContext = new LockContext(); 36 if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60; 37 String sessionId = createSession(lockName,ttlSeconds); 38 boolean success = lock(lockName,sessionId); 39 if(success == false){ 40 consulClient.sessionDestroy(sessionId,null); 41 lockContext.setGetLock(false); 42 43 return lockContext; 44 } 45 46 lockContext.setSession(sessionId); 47 lockContext.setGetLock(true); 48 49 return lockContext; 50 } 51 52 /** 53 * 釋放鎖 54 * @param sessionID 55 */ 56 public void releaseLock(String sessionID){ 57 consulClient.sessionDestroy(sessionID,null); 58 } 59 60 private String createSession(String lockName,int ttlSeconds){ 61 NewCheck check = new NewCheck(); 62 check.setId("check "+lockName); 63 check.setName(check.getId()); 64 check.setTtl(ttlSeconds+"s"); //該值和session ttl共同決定決定鎖定時長 65 check.setTimeout("10s"); 66 consulClient.agentCheckRegister(check); 67 consulClient.agentCheckPass(check.getId()); 68 69 NewSession session = new NewSession(); 70 session.setBehavior(Session.Behavior.RELEASE); 71 session.setName("session "+lockName); 72 session.setLockDelay(1); 73 session.setTtl(ttlSeconds + "s"); //和check ttl共同決定鎖時長 74 List<String> checks = new ArrayList<>(); 75 checks.add(check.getId()); 76 session.setChecks(checks); 77 String sessionId = consulClient.sessionCreate(session,null).getValue(); 78 79 return sessionId; 80 } 81 82 private boolean lock(String lockName,String sessionId){ 83 PutParams putParams = new PutParams(); 84 putParams.setAcquireSession(sessionId); 85 86 boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue(); 87 88 return isSuccess; 89 } 90 91 /** 92 * 競爭鎖時返回的物件 93 */ 94 @Data 95 public class LockContext{ 96 /** 97 * 獲得鎖成功返回該值,比便後面用該值來釋放鎖 98 */ 99 private String session; 100 /** 101 * 是否獲得到鎖 102 */ 103 private boolean isGetLock; 104 } 105 }
pom檔案
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.lyb</groupId> <artifactId>consul-lock</artifactId> <version>0.0.1-SNAPSHOT</version> <name>consul-lock</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR2</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-consul-discovery</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
測試程式碼:
package com.lyb.consullock; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @RunWith(SpringRunner.class) @SpringBootTest public class ConsulLockApplicationTests { @Autowired private ServiceConfig serviceConfig; @Test public void lockSameResourer() { //針對相同資源在同一時刻只有一個執行緒會獲得鎖 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int a=0;a<20;a++){ threadPool.submit( () -> { for (int i = 0;i < 100; i++) { DistributedLock lock = new DistributedLock( serviceConfig.getConsulRegisterHost(), serviceConfig.getConsulRegisterPort()); DistributedLock.LockContext lockContext = lock.getLock("test lock", 10); if (lockContext.isGetLock()) { System.out.println(Thread.currentThread().getName() + "獲得了鎖"); try { TimeUnit.SECONDS.sleep(1); lock.releaseLock(lockContext.getSession()); } catch (InterruptedException e) { e.printStackTrace(); } }else { //System.out.println(Thread.currentThread().getName() + "沒有獲得鎖"); } } }); } try { TimeUnit.MINUTES.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void lockDiffResource(){ //針對不通的資源所有執行緒都應該能獲得鎖 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int a=0;a<20;a++){ threadPool.submit( () -> { for (int i = 0;i < 100; i++) { DistributedLock lock = new DistributedLock( serviceConfig.getConsulRegisterHost(), serviceConfig.getConsulRegisterPort()); DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10); if (lockContext.isGetLock()) { System.out.println(Thread.currentThread().getName() + "獲得了鎖"); try { TimeUnit.SECONDS.sleep(1); lock.releaseLock(lockContext.getSession()); } catch (InterruptedException e) { e.printStackTrace(); } }else { //System.out.println(Thread.currentThread().getName() + "沒有獲得鎖"); Assert.assertTrue(lockContext.isGetLock()); } } }); } try { TimeUnit.MINUTES.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }
希望對大家有所幫助
專案路徑:
https://github.com/wenwuxianren/consul-