Zookeeper 實現分散式配置中心
個人備忘
前言:
zookeeper 五個功能點
1. master的管理,如amq 叢集,kafka叢集。
2. 分散式鎖(悲觀、樂觀)
3. 分散式配置中心。
4. 叢集的監管。
5. 釋出與訂閱(佇列)。
以上五點,都是zookeeper的特性決定的,我們知道zookeeper有兩類節點:
1. 臨時節點。(可順序)
2. 永久節點。(可順序)
再加上zookeeper提供了,對節點的監聽事件(刪除,新增,修改–針對節點),使得我們可以通過節點的監聽來做相對應的業務邏輯,以上五點均是這樣。
其他:zookeeper的leader 的選舉是通過 ZAB 進行的,保證有序性是靠:zxid 。其他核心演算法,自行科補 。
簡單說下原理,已資料庫連線池jdbc 為例,我們通常將配置資訊配置到我們的property檔案中,用key=value 來進行配置,動態載入到我們的xml 中(原理:xml 讀流到記憶體,property讀流到記憶體,${}表示式作為String 的切割,來動態新增)。我們可以通過zookeeper對節點的監聽,來獲取配置在遠端zookeeper服務節點中的“資料庫連線等資訊”如:uname ,pass ,url ,driver 。獲取到之後,我們通過Spring 的上下文物件,在java中動態獲取 datasource 這個bean , 通過datasource 物件的set 方法,將zookeeper節點獲取到的連線資訊設定到datasource 物件中。我們還可以通過對這些節點的監聽(watcher),當配置資訊變化後,可以通過 所有的client 對節點的監聽來完成對datasource 配置新的連線資訊。
程式碼展示 :
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" >
<context:annotation-config />
<context:component-scan base-package="com.dongnao.demo" />
<bean id="zkCentralConfigurer" class="com.dongnao.demo.config.ZookeeperCentralConfigurer">
<constructor-arg name="zkServers" value="localhost:2181" />
<constructor-arg name="sessionTimeout" value="1000" />
<constructor-arg name="zkPath" value="/jdbc" />
</bean>
<bean id="zkPlaceholderConfigurer" class="com.dongnao.demo.config.ZookeeperPlaceholderConfigurer">
<property name="zkCentralConfigurer" ref="zkCentralConfigurer" />
<property name="ignoreUnresolvablePlaceholders" value="true" />
<property name="order" value="1" />
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource">
<ref bean="dataSource" />
</property>
</bean>
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource"
destroy-method="shutdown">
<property name="driverClassName" value="${driver}" />
<property name="jdbcUrl" value="${url}" />
<property name="username" value="${uname}" />
<property name="password" value="${upass}" />
<!-- 連線只讀資料庫時配置為true, 保證安全 -->
<property name="readOnly" value="false" />
<!-- 等待連線池分配連線的最大時長(毫秒),超過這個時長還沒可用的連線則發生SQLException, 預設:30秒 -->
<property name="connectionTimeout" value="30000" />
<!-- 一個連線idle狀態的最大時長(毫秒),超時則被釋放(retired),預設:10分鐘 -->
<property name="idleTimeout" value="600000" />
<!-- 一個連線的生命時長(毫秒),超時而且沒被使用則被釋放(retired),預設:30分鐘,建議設定比資料庫超時時長少30秒,參考MySQL
wait_timeout引數(show variables like '%timeout%';) -->
<property name="maxLifetime" value="1800000" />
<!-- 連線池中允許的最大連線數。預設值:10;推薦的公式:((core_count * 2) + effective_spindle_count) -->
<property name="maximumPoolSize" value="15" />
</bean>
</beans>
ZookeeperCentralConfigurer .java
package com.dongnao.demo.config;
import java.util.List;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import com.zaxxer.hikari.HikariDataSource;
public class ZookeeperCentralConfigurer {
private CuratorFramework zkClient;
private TreeCache treeCache;
private String zkServers;
private String zkPath;
private int sessionTimeout;
private Properties props;
public ZookeeperCentralConfigurer(String zkServers, String zkPath, int sessionTimeout) {
this.zkServers = zkServers;
this.zkPath = zkPath;
this.sessionTimeout = sessionTimeout;
this.props = new Properties();
initZkClient();
getConfigData();
addZkListener();
}
private void initZkClient() {
zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
zkClient.start();
}
private void getConfigData() {
try {
List<String> list = zkClient.getChildren().forPath(zkPath);
for (String key : list) {
String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
if (value != null && value.length() > 0) {
props.put(key, value);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void addZkListener() {
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
getConfigData();
WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
System.out.println("================"+props.getProperty("url"));
dataSource.setJdbcUrl(props.getProperty("url"));
}
}
};
treeCache = new TreeCache(zkClient, zkPath);
try {
treeCache.start();
treeCache.getListenable().addListener(listener);
} catch (Exception e) {
e.printStackTrace();
}
}
public Properties getProps() {
return props;
}
public void setZkServers(String zkServers) {
this.zkServers = zkServers;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
ZookeeperPlaceholderConfigurer.java
package com.dongnao.demo.config;
import java.util.Properties;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
private ZookeeperCentralConfigurer zkCentralConfigurer;
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {
super.processProperties(beanFactoryToProcess, zkCentralConfigurer.getProps());
}
public void setZkCentralConfigurer(ZookeeperCentralConfigurer zkCentralConfigurer) {
this.zkCentralConfigurer = zkCentralConfigurer;
}
}
Zk客戶端命令 :
[zk: localhost:2181(CONNECTED) 2]create /jdbc db
Created /jdbc
[zk: localhost:2181(CONNECTED) 2] create /jdbc/url jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Created /jdbc/url
[zk: localhost:2181(CONNECTED) 3] create /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Node already exists: /jdbc/url
[zk: localhost:2181(CONNECTED) 4] get /jdbc/url
jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
cZxid = 0x3
ctime = Tue Mar 13 15:46:46 CST 2018
mZxid = 0x3
mtime = Tue Mar 13 15:46:46 CST 2018
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0
[zk: localhost:2181(CONNECTED) 5] set /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
[zk: localhost:2181(CONNECTED) 11] create /jdbc/uname root
Created /jdbc/uname
[zk: localhost:2181(CONNECTED) 12] create /jdbc/driver com.mysql.jdbc.Driver
Created /jdbc/driver
[zk: localhost:2181(CONNECTED) 13] create /jdbc/upass agui
Created /jdbc/upass
啟動tomcat即可。