1. 程式人生 > >基於Consul的分散式鎖實現

基於Consul的分散式鎖實現

作者:程式猿DD   原文地址

我們在構建分散式系統的時候,經常需要控制對共享資源的互斥訪問。這個時候我們就涉及到分散式鎖(也稱為全域性鎖)的實現,基於目前的各種工具,我們已經有了大量的實現方式,比如:基於Redis的實現、基於Zookeeper的實現。本文將介紹一種基於Consul 的Key/Value儲存來實現分散式鎖以及訊號量的方法。

分散式鎖實現

基於Consul的分散式鎖主要利用Key/Value儲存API中的acquire和release操作來實現。acquire和release操作是類似Check-And-Set的操作:

  • acquire操作只有當鎖不存在持有者時才會返回true,並且set設定的Value值,同時執行操作的session會持有對該Key的鎖,否則就返回false。
  • release操作則是使用指定的session來釋放某個Key的鎖,如果指定的session無效,那麼會返回false,否則就會set設定Value值,並返回true。

具體實現中主要使用了這幾個Key/Value的API:

基本流程

具體實現

public class Lock {

    private static final String prefix = "lock/";  // 同步鎖引數字首

    private ConsulClient consulClient;
    private String sessionName;
    private String sessionId = null
; private String lockKey; /** * * @param consulClient * @param sessionName 同步鎖的session名稱 * @param lockKey 同步鎖在consul的KV儲存中的Key路徑,會自動增加prefix字首,方便歸類查詢 */ public Lock(ConsulClient consulClient, String sessionName, String lockKey) { this.consulClient = consulClient; this
.sessionName = sessionName; this.lockKey = prefix + lockKey; } /** * 獲取同步鎖 * * @param block 是否阻塞,直到獲取到鎖為止 * @return */ public Boolean lock(boolean block) { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession(sessionName); while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { continue; } else { return false; } } } /** * 釋放同步鎖 * * @return */ public Boolean unlock() { PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); consulClient.sessionDestroy(sessionId, null); return result; } /** * 建立session * @param sessionName * @return */ private String createSession(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } }

單元測試

下面單元測試的邏輯:通過執行緒的方式來模擬不同的分散式服務來競爭鎖。多個處理執行緒同時以阻塞方式來申請分散式鎖,當處理執行緒獲得鎖之後,Sleep一段隨機事件,以模擬處理業務邏輯,處理完畢之後釋放鎖。

public class TestLock {

    private Logger logger = Logger.getLogger(getClass());

    @Test
    public void testLock() throws Exception  {
        new Thread(new LockRunner(1)).start();
        new Thread(new LockRunner(2)).start();
        new Thread(new LockRunner(3)).start();
        new Thread(new LockRunner(4)).start();
        new Thread(new LockRunner(5)).start();
        Thread.sleep(200000L);
    }

    class LockRunner implements Runnable {

        private Logger logger = Logger.getLogger(getClass());
        private int flag;

        public LockRunner(int flag) {
            this.flag = flag;
        }

        @Override
        public void run() {
            Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
            try {
                if (lock.lock(true)) {
                    logger.info("Thread " + flag + " start!");
                    Thread.sleep(new Random().nextInt(3000L));
                    logger.info("Thread " + flag + " end!");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

}

單元測試執行結果如下:

2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner – Thread 1 start! 2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner – Thread 1 end! 2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner – Thread 3 start! 2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner – Thread 3 end! 2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner – Thread 2 start! 2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner – Thread 2 end! 2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner – Thread 5 start! 2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner – Thread 5 end! 2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner – Thread 4 start! 2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner – Thread 4 end!

從測試結果我們可以看到,通過分散式鎖的形式來控制併發時,多個同步操作只會有一個操作能夠被執行,其他操作只有在等鎖釋放之後才有機會去執行,所以通過這樣的分散式鎖,我們可以控制共享資源同時只能被一個操作進行執行,以保障資料處理時的分散式併發問題。

優化建議

本文我們實現了基於Consul的簡單分散式鎖,但是在實際執行時,可能會因為各種各樣的意外情況導致unlock操作沒有得到正確地執行,從而使得分散式鎖無法釋放。所以為了更完善的使用分散式鎖,我們還必須實現對鎖的超時清理等控制,保證即使出現了未正常解鎖的情況下也能自動修復,以提升系統的健壯性。那麼如何實現呢?請持續關注我的後續分解!

參考文件

實現程式碼