跟著例項學習ZooKeeper的用法: 分散式鎖
鎖
分散式的鎖全域性同步, 這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。
可重入鎖Shared Reentrant Lock
首先我們先看一個全域性可重入的鎖。 Shared意味著鎖是全域性可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex
來實現。 它的建構函式為:
public InterProcessMutex(CuratorFramework client, String path)
通過acquire
獲得鎖,並提供超時機制:
public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()
public boolean acquire(long time,
TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can
call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not
通過release()
方法釋放鎖。 InterProcessMutex 例項可以重用。
Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。 為了撤銷mutex, 呼叫下面的方法:
public void makeRevocable(RevocationListener<T> listener)
將鎖設為可撤銷的. 當別的程序或執行緒想讓你釋放鎖是Listener會被呼叫。
Parameters:
listener - the listener
如果你請求撤銷當前的鎖, 呼叫Revoker
方法。
public static void attemptRevoke(CuratorFramework client,
String path)
throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()
錯誤處理 還是強烈推薦你使用ConnectionStateListener
處理連線狀態的改變。 當連線LOST時你不再擁有鎖。
首先讓我們建立一個模擬的共享資源, 這個資源期望只能單執行緒的訪問,否則會有併發問題。
package com.colobu.zkrecipe.lock;
import java.util.concurrent.atomic.AtomicBoolean;
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
// 真實環境中我們會在這裡訪問/維護一個共享的資源
//這個例子在使用鎖的情況下不會非法併發異常IllegalStateException
//但是在無鎖的情況由於sleep了一段時間,很容易丟擲異常
if (!inUse.compareAndSet(false, true)) {
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false);
}
}
}
然後建立一個ExampleClientThatLocks
類, 它負責請求鎖, 使用資源,釋放鎖這樣一個完整的訪問過程。
package com.colobu.zkrecipe.lock;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
public class ExampleClientThatLocks {
private final InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " has the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
}
}
}
最後建立主程式來測試。
package com.colobu.zkrecipe.lock;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class InterProcessMutexExample {
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
}
}
程式碼也很簡單,生成10個client, 每個client重複執行10次 請求鎖–訪問資源–釋放鎖的過程。每個client都在獨立的執行緒中。 結果可以看到,鎖是隨機的被每個例項排他性的使用。
既然是可重用的,你可以在一個執行緒中多次呼叫acquire
,線上程擁有鎖時它總是返回true。
你不應該在多個執行緒中用同一個InterProcessMutex, 你可以在每個執行緒中都生成一個InterProcessMutex例項,它們的path都一樣,這樣它們可以共享同一個鎖。
不可重入鎖Shared Lock
這個鎖和上面的相比,就是少了Reentrant
的功能,也就意味著它不能在同一個執行緒中重入。 這個類是InterProcessSemaphoreMutex
。 使用方法和上面的類類似。
首先我們將上面的例子修改一下,測試一下它的重入。 修改ExampleClientThatLocks.doWork
,連續兩次acquire
:
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock");
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock again");
try {
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
lock.release(); // always release the lock in a finally block
}
}
注意我們也需要呼叫release
兩次。這和JDK的ReentrantLock用法一致。如果少呼叫一次release
,則此執行緒依然擁有鎖。 上面的程式碼沒有問題,我們可以多次呼叫acquire
,後續的acquire
也不會阻塞。 將上面的InterProcessMutex
換成不可重入鎖InterProcessSemaphoreMutex
,如果再執行上面的程式碼,結果就會發現執行緒被阻塞再第二個acquire
上。 也就是此鎖不是可重入的。
可重入讀寫鎖Shared Reentrant Read Write Lock
類似JDK的ReentrantReadWriteLock
. 一個讀寫鎖管理一對相關的鎖。 一個負責讀操作,另外一個負責寫操作。 讀操作在寫鎖沒被使用時可同時由多個程序使用,而寫鎖使用時不允許讀 (阻塞)。 此鎖是可重入的。一個擁有寫鎖的執行緒可重入讀鎖,但是讀鎖卻不能進入寫鎖。 這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 —>讀鎖 —->釋放寫鎖。 從讀鎖升級成寫鎖是不成的。
主要由兩個類實現:
- InterProcessReadWriteLock
- InterProcessLock
使用時首先建立一個InterProcessReadWriteLock
例項,然後再根據你的需求得到讀鎖或者寫鎖, 讀寫鎖的型別是InterProcessLock
。
public InterProcessLock readLock()
public InterProcessLock writeLock()
例子和上面的類似。
package com.colobu.zkrecipe.lock;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
public class ExampleClientReadWriteLocks {
private final InterProcessReadWriteLock lock;
private final InterProcessMutex readLock;
private final InterProcessMutex writeLock;
private final FakeLimitedResource resource;
private final String clientName;
public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessReadWriteLock(client, lockPath);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!writeLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the writeLock");
}
System.out.println(clientName + " has the writeLock");
if (!readLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the readLock");
}
System.out.println(clientName + " has the readLock too");
try {
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
readLock.release(); // always release the lock in a finally block
writeLock.release(); // always release the lock in a finally block
}
}
}
在這個類中我們首先請求了一個寫鎖, 然後降級成讀鎖。 執行業務處理,然後釋放讀寫鎖。
訊號量Shared Semaphore
一個計數的訊號量類似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Cubator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數。第一種方式是有使用者給定的path決定。第二種方式使用SharedCountReader
類。 如果不使用SharedCountReader, 沒有內部程式碼檢查程序是否假定有10個租約而程序B假定有20個租約。 所以所有的例項必須使用相同的numberOfLeases值.
這次呼叫acquire
會返回一個租約物件。 客戶端必須在finally中close這些租約物件,否則這些租約會丟失掉。 但是, 但是,如果客戶端session由於某種原因比如crash丟掉, 那麼這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續使用這些租約。 租約還可以通過下面的方式返還:
public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
注意一次你可以請求多個租約,如果Semaphore當前的租約不夠,則請求執行緒會被阻塞。 同時還提供了超時的過載方法。
public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
主要類有:
- InterProcessSemaphoreV2
- Lease
- SharedCountReader
下面是使用的例子:
package com.colobu.zkrecipe.lock;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
public class InterProcessSemaphoreExample {
private static final int MAX_LEASE = 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection<Lease> leases = semaphore.acquire(5);
System.out.println("get " + leases.size() + " leases");
Lease lease = semaphore.acquire();
System.out.println("get another lease");
resource.use();
Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("Should timeout and acquire return " + leases2);
System.out.println("return one lease");
semaphore.returnLease(lease);
System.out.println("return another 5 leases");
semaphore.returnAll(leases);
}
}
}
首先我們先獲得了5個租約, 最後我們把它還給了semaphore。 接著請求了一個租約,因為semaphore還有5個租約,所以請求可以滿足,返回一個租約,還剩4個租約。 然後再請求一個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null。
上面說講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每個客戶端都按照請求的順序獲得鎖。 相當公平。
多鎖物件 Multi Shared Lock
Multi Shared Lock是一個鎖的容器。 當呼叫acquire
, 所有的鎖都會被acquire
,如果請求失敗,所有的鎖都會被release。 同樣呼叫release時所有的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。
主要涉及兩個類:
- InterProcessMultiLock
- InterProcessLock
它的建構函式需要包含的鎖的集合,或者一組ZooKeeper的path。
public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
用法和Shared Lock相同。
例子如下:
package com.colobu.zkrecipe.lock;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
public class InterProcessMultiLockExample {
private static final String PATH1 = "/examples/locks1";
private static final String PATH2 = "/examples/locks2";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("could not acquire the lock");
}
System.out.println("has the lock");
System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); //access resource exclusively
} finally {
System.out.println("releasing the lock");
lock.release(); // always release the lock in a finally block
}
System.out.println("has the lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has the lock2: " + lock2.isAcquiredInThisProcess());
}
}
}
新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 呼叫acquire
後可以看到執行緒同時擁有了這兩個鎖。 呼叫release
看到這兩個鎖都被釋放了。
再重申以便, 強烈推薦使用ConnectionStateListener監控連線的狀態。