(六)、ZooKeeper自動重連
在一套分散式的online services系統中,各service通常不會放在一臺伺服器上,而是通過Zookeeper這樣的東西,將自己的service資訊註冊到上面,service的使用者通過Zookeeper來發現各service的資訊,從而可以將request傳送到不同的service上去處理。
如上圖所示,兩個Service Provider 1和2分別在192.168.1.5和192.168.1.6這兩臺伺服器的2688埠上提供服務,服務的地址和埠註冊到了Zookeeper中。Service User通過查詢Zookeeper,可得知這些服務的資訊。通常,Service User與Service Provider之間的通訊,是通過connection pool實現的,因為Service User不可能假定在第一次查詢到所有Service Provider的資訊之後,它們就是一直存活的,假如某個Service
Provider因為程式問題死掉了,向它傳送request只會造成大量的失敗結果,因此通常會實現一個connection pool來保證實時更新節點的資訊,當有一個Service Provider從Zookeeper上消失之後,從connection pool中取出的connection總是可用的(即:總能通過它把request傳送到一個有效的Service Provider那裡)。
程式碼如下:
package com.lyh.zk; import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; import com.lyh.common.ZKCommon; /** * ZooKeeper監聽 * @author liuyuehu */ public class MyZooKeeper { Logger logger = Logger.getLogger(MyZooKeeper.class); protected CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper zooKeeper = null; private Object waiter = new Object(); /** * 監控所有被觸發的事件 */ public void process(WatchedEvent event) { logger.info("收到事件通知:" + event.getState() ); if(event.getState()==KeeperState.SyncConnected){ countDownLatch.countDown(); } } /** * <p>連線Zookeeper</p> * 啟動zk服務 本例項基於自動重連策略,如果zk連線沒有建立成功或者在執行時斷開,將會自動重連. * @param connectString Zookeeper服務地址 * @param sessionTimeout Zookeeper連線超時時間 */ public void connect(){ try { synchronized (waiter) { SessionWatcher watcher = new SessionWatcher(); if(zooKeeper == null){ // ZK客戶端允許我們將ZK伺服器的所有地址都配置在這裡 zooKeeper = new ZooKeeper(ZKCommon.connectAddress,ZKCommon.sessionTimeout,watcher); // 使用CountDownLatch.await()的執行緒(當前執行緒)阻塞直到所有其它擁有 //CountDownLatch的執行緒執行完畢(countDown()結果為0) countDownLatch.await(); } } } catch (IOException e) { logger.error("連線建立失敗,發生 InterruptedException , e " + e.getMessage(), e); } catch (InterruptedException e) { logger.error( "連線建立失敗,發生 IOException , e " + e.getMessage(), e ); } waiter.notifyAll(); } /** * 關閉連線 */ public void close(){ try { synchronized (waiter) { if(zooKeeper != null){ zooKeeper.close(); } waiter.notifyAll(); } } catch (InterruptedException e) { logger.error("release connection error ," + e.getMessage() ,e); } } class SessionWatcher implements Watcher { public void process(WatchedEvent event) { // 如果是“資料變更”事件 if (event.getType() != Event.EventType.None) { return; } synchronized (waiter){ switch(event.getState()) { case SyncConnected: //zk連線建立成功,或者重連成功 waiter.notifyAll(); logger.info("Connected..."); break; case Expired: // session過期,這是個非常嚴重的問題,有可能client端出現了問題,也有可能zk環境故障 // 此處僅僅是重新例項化zk client logger.info("Expired(重連)..."); connect(); break; case Disconnected: logger.info("連結斷開,或session遷移...."); break; case AuthFailed: close(); throw new RuntimeException("ZK Connection auth failed..."); default: break; } } } } }
參考地址網頁:http://iwinit.iteye.com/blog/1844177
http://www.codelast.com/%e5%8e%9f%e5%88%9b-zookeeper%e6%b3%a8%e5%86%8c%e8%8a%82%e7%82%b9%e7%9a%84%e6%8e%89%e7%ba%bf%e8%87%aa%e5%8a%a8%e9%87%8d%e6%96%b0%e6%b3%a8%e5%86%8c%e5%8f%8a%e6%b5%8b%e8%af%95%e6%96%b9%e6%b3%95/