1. 程式人生 > >curator實現zookeeper的分散式鎖

curator實現zookeeper的分散式鎖

基於curator元語實現的分散式鎖種類有好幾種,下面只講解一種實現。InterProcessMutex實現的分散式鎖屬於可重入式鎖,當一個客戶端獲取到lock鎖之後,可以重複呼叫acquire()而不會發生阻塞。基於InterProcessSemaphoreMutex實現的分散式的分散式鎖是不可重入的,當一個客戶端獲取到lock鎖之後,再次呼叫acquire方法獲取鎖時會發生阻塞。基於InterProcessReadWriteLock實現的分散式鎖裡邊包含了讀鎖與寫鎖,其中讀鎖與讀鎖互斥,讀鎖與寫鎖互斥,讀鎖與讀鎖不互斥。

public class DistributeLock {
private static final String PATH = “/example/cache”;
private static final String zkStr=”master:2181,worker1:2181,worker2:2181”;
//private InterProcessReadWriteLock lock;
public static void main(String []args){

    for(int i=0;i<200;i++){
       CuratorFramework client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
       client.start();
       LockExample example=new LockExample(client,PATH,"client"+i);
       try{
           example.doWork(10, TimeUnit.SECONDS);
       }catch (Exception e){

       }finally {
           CloseableUtils.closeQuietly(client);
       }
    }
}

}

public class LockExample {
private CuratorFramework client;
private InterProcessMutex lock;
private String lockPath;
private String clientName;
public LockExample(CuratorFramework client,String lockPath,String clientName){
this.lockPath=lockPath;
this.clientName=clientName;
lock=new InterProcessMutex(client,lockPath);
}

public void doWork(long time,TimeUnit unit )throws  Exception{

    try
    {
        if (lock.acquire(time,unit))
        {
            System.out.println(clientName + " has the lock");
        }
    }
    finally
    {
        System.out.println(clientName + " releasing the lock");
        lock.release(); // always release the lock in a finally block
    }
}

}

基於InterProcessReadWriteLock實現的分散式鎖裡邊包含了讀鎖與寫鎖,其中讀鎖與讀鎖互斥,讀鎖與寫鎖互斥,讀鎖與讀鎖不互斥。驗證程式碼最下所示:不管獲取到讀鎖還是寫鎖都不釋放,通過例子發現先獲取到寫鎖如果不釋放是不可能獲到讀鎖的,先獲到讀鎖如果不釋放是不可能獲到寫鎖的但是其他客戶端client還是會獲取到讀鎖。呼叫writeLock.acquire(time,unit)時time時間越短即使理論上可以獲取到讀鎖,但是失敗的次數也很多,因此應該找一個合適的時間引數進行獲取。例如設定成3秒就還可以,1秒失敗的此處比較多。
public class ReadWriteLockExample {

private static final String     PATH = "/example/cache";
private static final String zkStr="master:2181,worker1:2181,worker2:2181";
private static FakeLimitedResource resource=new FakeLimitedResource();
public static void main(String []args){
    //獲取寫鎖
    new Thread(new Runnable(){
        CuratorFramework client=null;
        public void run() {
            try{
                for(int i=0;i<20;i++){
                    client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
                    client.start();
                    ReadAndWriteLock example=new ReadAndWriteLock(client,resource,PATH,"client"+i);
                    example.doWorkXie(3, TimeUnit.SECONDS);
                }
            }catch (Exception e){

            }finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    }).start();

    //獲取讀鎖
    new Thread(new Runnable(){
        CuratorFramework client=null;
        public void run() {
            try{
                for(int i=0;i<20;i++){
                    client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
                    client.start();
                    ReadAndWriteLock example=new ReadAndWriteLock(client,resource,PATH,"client"+i);
                    example.doWorkDu(3, TimeUnit.SECONDS);
                }
            }catch (Exception e){

            }finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    }).start();

// try{
// CuratorFramework client=null;
// ReadAndWriteLock example=null;
// //先獲取一個寫鎖
// client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
// client.start();
// example=new ReadAndWriteLock(client,resource,PATH,”client”);
// example.doWorkXie(5,TimeUnit.SECONDS);
//
// //先獲取一個讀鎖不釋放,看看能不能
// client= CuratorFrameworkFactory.newClient(zkStr, new ExponentialBackoffRetry(1000, 3));
// client.start();
// example=new ReadAndWriteLock(client,resource,PATH,”client”);
// example.doWorkDu(5,TimeUnit.SECONDS);
//
// }catch(Exception e){}
// finally {
//
// }

}

}
主函式:

/**
* Created by zhangpengshuai on 17/4/24.
*/
public class ReadAndWriteLock {
private InterProcessReadWriteLock lock;
private InterProcessMutex readLock;
private InterProcessMutex writeLock;
private FakeLimitedResource resource;
private String clientName;
public ReadAndWriteLock(CuratorFramework client,FakeLimitedResource resource ,String lockPath, String clientName)
{
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessReadWriteLock(client, lockPath);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void doWorkDu(long time, TimeUnit unit) throws Exception
{
try{
if(readLock.acquire(time,unit)){
System.out.println(clientName + ” 已得到du鎖”);
}else{
System.out.println(clientName + ” meiyou得到du鎖”);

        }
        //只獲取讀鎖
    }finally {
        //System.out.println(clientName+"  已釋放du鎖");
        //readLock.release();
    }




}

//只獲取讀鎖
public void doWorkXie(long time, TimeUnit unit) throws Exception
{
    try{
        if(writeLock.acquire(time,unit)){
            System.out.println(clientName + " 已得到xie鎖");
        }
        else{
            System.out.println(clientName + " meiyou得到xie鎖");
        }
        //Thread.sleep(1000);
    }finally {
        //System.out.println(clientName + " 已釋放xie鎖");
        //writeLock.release();
    }
}

}