1. 程式人生 > >Curator教程(三)分散式鎖

Curator教程(三)分散式鎖

共享鎖

@Testpublic void sharedLock() throws Exception {    // 建立共享鎖
    InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);    // lock2 用於模擬其他客戶端
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);    // 獲取鎖物件
    lock.acquire();    // 測試是否可以重入
    // 超時獲取鎖物件(第一個引數為時間, 第二個引數為時間單位), 因為鎖已經被獲取, 所以返回 false
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));    // 釋放鎖
    lock.release();    // lock2 嘗試獲取鎖成功, 因為鎖已經被釋放
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入鎖

public void sharedReentrantLock() throws Exception {    // 建立可重入鎖
    InterProcessLock lock = new InterProcessMutex(client, lockPath);    // lock2 用於模擬其他客戶端
    InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);    // lock 獲取鎖
    lock.acquire();    try {        // lock 第二次獲取鎖
        lock.acquire();        try {            // lock2 超時獲取鎖, 因為鎖已經被 lock 客戶端佔用, 所以獲取失敗, 需要等 lock 釋放
            Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
        } finally {
            lock.release();
        }
    } finally {        // 重入鎖獲取與釋放需要一一對應, 如果獲取 2 次, 釋放 1 次, 那麼該鎖依然是被佔用, 如果將下面這行程式碼註釋, 那麼會發現下面的 lock2 獲取鎖失敗
        lock.release();
    }    // 在 lock 釋放後, lock2 能夠獲取鎖
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入讀寫鎖

@Testpublic void sharedReentrantReadWriteLock() throws Exception {    // 建立讀寫鎖物件, Curator 以公平鎖的方式進行實現
    InterProce***eadWriteLock lock = new InterProce***eadWriteLock(client, lockPath);    // lock2 用於模擬其他客戶端
    InterProce***eadWriteLock lock2 = new InterProce***eadWriteLock(client2, lockPath);    // 使用 lock 模擬讀操作
    // 使用 lock2 模擬寫操作
    // 獲取讀鎖(使用 InterProcessMutex 實現, 所以是可以重入的)
    InterProcessLock readLock = lock.readLock();    // 獲取寫鎖(使用 InterProcessMutex 實現, 所以是可以重入的)
    InterProcessLock writeLock = lock2.writeLock();    /**
     * 讀寫鎖測試物件
     */
    class ReadWriteLockTest {        // 測試資料變更欄位
        private Integer testData = 0;        private Set<Thread> threadSet = new HashSet<>();        // 寫入資料
        private void write() throws Exception {
            writeLock.acquire();            try {
                Thread.sleep(10);
                testData++;
                System.out.println("寫入資料 \ t" + testData);
            } finally {
                writeLock.release();
            }
        }        // 讀取資料
        private void read() throws Exception {
            readLock.acquire();            try {
                Thread.sleep(10);
                System.out.println("讀取資料 \ t" + testData);
            } finally {
                readLock.release();
            }
        }        // 等待執行緒結束, 防止 test 方法呼叫完成後, 當前執行緒直接退出, 導致控制檯無法輸出資訊
        public void waitThread() throws InterruptedException {            for (Thread thread : threadSet) {
                thread.join();
            }
        }        // 建立執行緒方法
        private void createThread(int type) {
            Thread thread = new Thread(new Runnable() {                @Override
                public void run() {                    try {                        if (type == 1) {
                            write();
                        } else {
                            read();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            threadSet.add(thread);
            thread.start();
        }        // 測試方法
        public void test() {            for (int i = 0; i < 5; i++) {
                createThread(1);
            }            for (int i = 0; i < 5; i++) {
                createThread(2);
            }
        }
    }

    ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
    readWriteLockTest.test();
    readWriteLockTest.waitThread();
}

測試結果如下:

寫入資料 1
寫入資料 2
讀取資料 2
寫入資料 3
讀取資料 3
寫入資料 4
讀取資料 4
讀取資料 4
寫入資料 5
讀取資料 5