1. 程式人生 > 其它 >阿里一面,說說你瞭解zookeeper的應用場景有哪些?

阿里一面,說說你瞭解zookeeper的應用場景有哪些?

1、前言

又到了金三銀四的時候,大家都按耐不住內心的躁動,我在這裡給大家分享下之前面試中遇到的一個知識點(zookeeper應用場景),希望對大家有些幫助。如有不足,歡迎大佬們指點指點。

2、zookeeper簡介

ZooKeeper 是分散式應用程式的分散式開源協調服務。它公開了一組簡單的api,分散式應用程式可以基於這些api實現更高級別的同步、配置維護、分組和命名服務。它被設計為易於程式設計,並使用一種資料模型,該模型以熟悉的檔案系統目錄樹結構為風格。它在 Java 中執行,並具有 Java 和 C 的繫結。

眾所周知,協調服務很難做好。它們特別容易出現競爭條件和死鎖等錯誤。ZooKeeper背後的動機是減輕分散式應用程式從頭開始實現協調服務的負擔。

3、zookeeper應用場景

下面的程式碼都需要一個序列化類,所以放在最前面宣告

/**
 * @author admin
 */
public class MyZkSerializer implements ZkSerializer {

	String charset = "UTF-8";

	@Override
	public Object deserialize(byte[] bytes) throws ZkMarshallingError {
		try {
			return new String(bytes, charset);
		} catch (UnsupportedEncodingException e) {
			throw new ZkMarshallingError(e);
		}
	}

	@Override
	public byte[] serialize(Object obj) throws ZkMarshallingError {
		try {
			return String.valueOf(obj).getBytes(charset);
		} catch (UnsupportedEncodingException e) {
			throw new ZkMarshallingError(e);
		}
	}
}

3.1 配置中心

3.1.1 什麼是配置中心呢?

假設咱們的專案部署在5臺機子上形成一個叢集,那麼這5個例項在啟動時讀取的配置資訊應該是一樣的,同時一旦咱們的配置資訊更改了,需要馬上通知到這5個例項上並生效,這就是配置中心的功能。

3.1.2 zookeeper怎麼實現配置中心呢?

必要條件

1、znode能儲存資料
2、Watch能監聽資料改變

實現方式

  1. 一個配置項對應一個zNode
// 1 將單個配置放到zookeeper上
public void putZk() {
	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new MyZkSerializer());
	String configPath = "/config1";
	String value = "1111111";
	if (client.exists(configPath)) {
		client.writeData(configPath, value);
	} else {
		client.createPersistent(configPath, value);
	}
	client.close();
}
// 需要配置的服務都從zk上取,並註冊watch來實時獲得配置更新
public void getConfigFromZk() {
	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new MyZkSerializer());
	String configPath = "/config1";
	String value = client.readData(configPath);
	System.out.println("從zk讀到配置config1的值為:" + value);
	// 監控配置的更新,基於watch實現釋出訂閱功能
	client.subscribeDataChanges(configPath, new IZkDataListener() {
		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			// TODO 配置刪除業務處理
		}
	
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			System.out.println("獲得更新的配置值:" + data);
		}
	});
	
	// 這裡只是為演示實時獲取到配置值更新而加的等待。實際專案應用中根據具體場景寫(可用阻塞方式)
	try {
		Thread.sleep(5 * 60 * 1000);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}

}
  1. 一個配置檔案對應一個zNode
// 將配置檔案的內容存放到zk節點上
public void putConfigFile2ZK() throws IOException {

	File f = new File(this.getClass().getResource("/config.xml").getFile());
	FileInputStream fin = new FileInputStream(f);
	byte[] datas = new byte[(int) f.length()];
	fin.read(datas);
	fin.close();

	ZkClient client = new ZkClient("192.168.10.11:2181");
	client.setZkSerializer(new BytesPushThroughSerializer());
	String configPath = "/config2";
	if (client.exists(configPath)) {
		client.writeData(configPath, datas);
	} else {
		client.createPersistent(configPath, datas);
	}
	client.close();
}

獲取整個配置檔案的方式跟步驟1類似,只不過需要解析對應的配置檔案而已。

3.2 命名服務(註冊中心)

3.2.1 什麼是註冊中心?

註冊中心主要儲存註冊例項應用的名稱和ip地址,供其他服務通過RPC來呼叫,其他服務只關心你的服務名是啥,而不必關心你的伺服器地址對不對,有沒有上線。

3.2.2 zookeeper怎麼實現註冊中心呢?

首先是服務發現問題,當一個例項啟動後會向zookeeper建立一個臨時節點,並存入自己的服務資訊(包括應用名和ip等),其他服務通過zookeeper拿到該例項的註冊資訊即可呼叫。

一旦該服務宕機了或者主動下線,那麼該臨時節點則會被刪除,其他服務通過watch監聽到下線通知,也就不會在去呼叫該服務。

3.3 Master選舉

3.3.1 什麼是Master選舉?

在一個主從部署的叢集裡,一般master例項負責所有請求的讀寫功能,其他slave例項同步master的資料,一旦master節點不可用了,那麼就需要從他的slave例項中重新選舉一個節點作為master例項。

3.3.2 zookeeper怎麼實現Master選舉呢?

首先是例項去競爭建立臨時決定(Master節點),誰建立成功誰就是master,否則就是slave。
同時所有的例項都需要去servers節點(臨時節點)註冊自己的服務資訊,方便通過該節點獲取到所有線上的例項,有點類似註冊中心的意思。

下面咱們通過程式碼來模擬一下master選舉

/**
 * @author yinfeng
 */
public class Server {

        private final String cluster;
        private final String name;
        private final String address;

        private final String path, value;

        private String master;

        public Server(String cluster, String name, String address) {
            super();
            this.cluster = cluster;
            this.name = name;
            this.address = address;
            path = "/" + this.cluster + "Master";
            value = "name:" + name + " address:" + address;

            final ZkClient client = new ZkClient("192.168.10.11:2181");
            client.setZkSerializer(new MyZkSerializer());

            final Thread thread = new Thread(() -> {
                electionMaster(client);
            });
            thread.setDaemon(true);
            thread.start();
        }
		/**
		* 選舉方法
		**/
        public void electionMaster(ZkClient client) {
            try {
                client.createEphemeral(path, value);
                master = client.readData(path);
                System.out.println(value + "建立節點成功,成為Master");
            } catch (ZkNodeExistsException e) {
                master = client.readData(path);
                System.out.println("Master為:" + master);
            }

            // 為阻塞自己等待而用
            final CountDownLatch cdl = new CountDownLatch(1);

            // 註冊watcher
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println("-----監聽到節點被刪除");
                    cdl.countDown();
                }
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                }
            };

            client.subscribeDataChanges(path, listener);

            // 讓自己阻塞
            if (client.exists(path)) {
                try {
                    cdl.await();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // 醒來後,取消watcher
            client.unsubscribeDataChanges(path, listener);
            // 遞迴調自己(下一次選舉)
            electionMaster(client);
        }
    }

咱們通過啟動多個服務來看看是否測試成功

public static void main(String[] args) {
    // 測試時,依次開啟多個Server例項java程序,然後停止獲取的master的節點,看誰搶到Master
    Server s = new Server("cluster1", "server1", "192.168.1.11:8991");
    Server s1 = new Server("cluster1", "server2", "192.168.1.11:8992");
    Server s2 = new Server("cluster1", "server3", "192.168.1.11:8993");
    Server s3 = new Server("cluster1", "server4", "192.168.1.11:8994");
    try {
        Thread.sleep(100000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}


可以看到功能一切正常

3.4 分散式佇列

3.4.1 什麼是分散式佇列?

佇列的定義是先進先出,而在分散式環境下保證先進先出的佇列就是分散式佇列,有點類似於訊息佇列。

3.4.2 zookeeper怎麼實現分散式佇列?

由上圖可知,zookeeper主要通過順序節點來保證佇列的先進先出。

3.5 分散式鎖

3.5.1 什麼是分散式鎖?

分散式鎖指的是控制分散式系統不同程序共同訪問共享資源的一種鎖的實現。 如果在不同的系統或同一個系統的不同主機之間共享和競爭某個臨界資源,往往需要互斥來防止彼此干擾,避免出現髒資料或非業務資料,保證資料一致性。

3.5.2 zookeeper通過臨時節點實現布式鎖?

實現原理是zookeeper節點不可重名和watch的監聽通知機制,使用臨時節點主要是為了避免獲取鎖的節點由於異常原因無法釋放鎖而導致出現死鎖情況。


競爭鎖流程如下圖:

程式碼實現如下

/**
 * @author yinfeng
 */
public class ZKDistributeLock implements Lock {

    private String lockPath;

    private ZkClient client;

    // 鎖重入計數
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();

    public ZKDistributeLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        client = new ZkClient("192.168.10.11:2181");
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public boolean tryLock() { 
    	// 鎖重入不會阻塞
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 0) {
                this.reentrantCount.set(++count);
                return true;
            }
        }
        // 建立節點
        try {
            client.createEphemeral(lockPath);
            this.reentrantCount.set(1);
        } catch (ZkNodeExistsException e) {
            return false;
        }
        return true;
    }

    @Override
    public void unlock() {
        // 重入釋進行放鎖處理
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            } else {
                this.reentrantCount.set(null);
            }
        }
        client.delete(lockPath);
    }

    @Override
    public void lock() { 
        // 如果獲取不到鎖,阻塞等待
        if (!tryLock()) {
            // 沒獲得鎖,阻塞自己
            waitForLock();
            // 再次嘗試
            lock();
        }

    }

    private void waitForLock() {
        final CountDownLatch cdl = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("----收到節點被刪除了-------------");
                cdl.countDown();
            }
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        client.subscribeDataChanges(lockPath, listener);
        // 阻塞自己
        if (this.client.exists(lockPath)) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 取消註冊
        client.unsubscribeDataChanges(lockPath, listener);
    }

    @Override
    public void lockInterruptibly() {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

咱們在寫個測試類試一下效果,通過多執行緒來模擬多例項競爭鎖

public static void main(String[] args) {
    // 併發數
    int currency = 50;
    // 迴圈屏障
    final CyclicBarrier cb = new CyclicBarrier(currency);
    // 多執行緒模擬高併發
    for (int i = 0; i < currency; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "---------我準備好---------------");
            // 等待一起出發
            try {
                cb.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 獲得鎖!");
                try {
                    Thread.sleep(1000 * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 釋放鎖!");
            }
        }
        ).start();
    }
}

可以看到功能是正常的,但也有個很明顯的問題,就是一旦釋放鎖之後所有的例項(執行緒)都會收到通知然後去重新競爭鎖,當例項的數量達到一定程度之後,那麼勢必會對zookeeper造成很大的頻寬和效能消耗,嚴重的話可能會把zookeeper叢集搞掛了,這種情況也叫驚群效應,所以只通過順序節點實現分散式鎖還是有一定的問題的,下面咱們再來優化一下。

3.5.3 zookeeper通過臨時順序節點實現布式鎖?

既然通過臨時節點會造成驚群效應,那麼咱們是否能將臨時和順序節點結合起來,通過最小的那個zNode節點來視為獲得鎖的標誌呢?
答案是肯定能的,當釋放鎖時只通知他的下一個節點即可,完美的避免了驚群效應的發生。

原理圖如下

流程圖如下

接著咱們通過程式碼來實現吧

/**
 * @author yinfeng
 */
public class ZKDistributeImproveLock implements Lock {

    /**
     * 利用臨時順序節點來實現分散式鎖
     * 獲取鎖:取排隊號(建立自己的臨時順序節點),然後判斷自己是否是最小號,如是,則獲得鎖;不是,則註冊前一節點的watcher,阻塞等待
     * 釋放鎖:刪除自己建立的臨時順序節點
     */
    private final String lockPath;

    private final ZkClient client;

    private ThreadLocal<String> currentPath = new ThreadLocal<>();

    private ThreadLocal<String> beforePath = new ThreadLocal<>();

    /**
     * 鎖重入計數
     */
    private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();

    public ZKDistributeImproveLock(String lockPath) {
        super();
        this.lockPath = lockPath;
        client = new ZkClient("192.168.10.11:2181");
        client.setZkSerializer(new MyZkSerializer());
        if (!this.client.exists(lockPath)) {
            try {
                this.client.createPersistent(lockPath);
            } catch (ZkNodeExistsException ignored) {

            }
        }
    }

    @Override
    public boolean tryLock() {
    	// 重入則直接返回獲得鎖成功
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 0) {
                this.reentrantCount.set(++count);
                return true;
            }
        }

        if (this.currentPath.get() == null) {
            currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
        }
        // 獲得所有的子節點
        List<String> children = this.client.getChildren(lockPath);

        // 排序list
        Collections.sort(children);

        // 判斷當前節點是否是最小的
        if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
            this.reentrantCount.set(1);
            return true;
        } else {
            // 取到前一個
            // 得到位元組的索引號
            int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
            beforePath.set(lockPath + "/" + children.get(curIndex - 1));
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            // 阻塞等待
            waitForLock();
            // 再次嘗試加鎖
            lock();
        }
    }
    
    private void waitForLock() {
        final CountDownLatch cdl = new CountDownLatch(1);
        // 註冊watcher
        IZkDataListener listener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("-----監聽到節點被刪除");
                cdl.countDown();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };

        client.subscribeDataChanges(this.beforePath.get(), listener);
        // 讓自己阻塞
        if (this.client.exists(this.beforePath.get())) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 醒來後,取消watcher
        client.unsubscribeDataChanges(this.beforePath.get(), listener);
    }

    @Override
    public void unlock() {
        // 重入的釋放鎖處理
        if (this.reentrantCount.get() != null) {
            int count = this.reentrantCount.get();
            if (count > 1) {
                this.reentrantCount.set(--count);
                return;
            } else {
                this.reentrantCount.set(null);
            }
        }
        // 刪除節點
        this.client.delete(this.currentPath.get());
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

最後咱們再來測試一下

public static void main(String[] args) {
    // 併發數
    int currency = 50;
    // 迴圈屏障
    final CyclicBarrier cb = new CyclicBarrier(currency);
    // 多執行緒模擬高併發
    for (int i = 0; i < currency; i++) {
        new Thread(() -> {

            System.out.println(Thread.currentThread().getName() + "---------我準備好---------------");
            // 等待一起出發
            try {
                cb.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");

            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " 獲得鎖!");
                try {
                    Thread.sleep(1000 * 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " 釋放鎖!");
            }
        }).start();
    }
}

可以看到功能是正常的,同時在釋放鎖的時候只通知了下一節點,沒有出現驚群效應,非常完美。

4、總結

在微服務和分散式的時代,zookeeper作為協調服務的代表,在面試中很容易被問到,希望大家能掌握這方面的知識,提高自己的核心競爭力,在談薪的時候拿到最高的那個區間。

最後,外出打工不易,希望各位兄弟找到自己心儀的工作,虎年發發發! 也希望兄弟們能關注、點贊、收藏、評論支援一波,非常感謝大家!