1. 程式人生 > >Zookeeper 實現分散式配置中心

Zookeeper 實現分散式配置中心

個人備忘

前言:
zookeeper 五個功能點
1. master的管理,如amq 叢集,kafka叢集。
2. 分散式鎖(悲觀、樂觀)
3. 分散式配置中心。
4. 叢集的監管。
5. 釋出與訂閱(佇列)。

以上五點,都是zookeeper的特性決定的,我們知道zookeeper有兩類節點:
1. 臨時節點。(可順序)
2. 永久節點。(可順序)

再加上zookeeper提供了,對節點的監聽事件(刪除,新增,修改–針對節點),使得我們可以通過節點的監聽來做相對應的業務邏輯,以上五點均是這樣。

其他:zookeeper的leader 的選舉是通過 ZAB 進行的,保證有序性是靠:zxid 。其他核心演算法,自行科補 。

簡單說下原理,已資料庫連線池jdbc 為例,我們通常將配置資訊配置到我們的property檔案中,用key=value 來進行配置,動態載入到我們的xml 中(原理:xml 讀流到記憶體,property讀流到記憶體,${}表示式作為String 的切割,來動態新增)。我們可以通過zookeeper對節點的監聽,來獲取配置在遠端zookeeper服務節點中的“資料庫連線等資訊”如:uname ,pass ,url ,driver 。獲取到之後,我們通過Spring 的上下文物件,在java中動態獲取 datasource 這個bean , 通過datasource 物件的set 方法,將zookeeper節點獲取到的連線資訊設定到datasource 物件中。我們還可以通過對這些節點的監聽(watcher),當配置資訊變化後,可以通過 所有的client 對節點的監聽來完成對datasource 配置新的連線資訊。

程式碼展示 :

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd  
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
>
<context:annotation-config /> <context:component-scan base-package="com.dongnao.demo" /> <bean id="zkCentralConfigurer" class="com.dongnao.demo.config.ZookeeperCentralConfigurer"> <constructor-arg name="zkServers" value="localhost:2181" /> <constructor-arg name="sessionTimeout" value="1000" /> <constructor-arg name="zkPath" value="/jdbc" /> </bean> <bean id="zkPlaceholderConfigurer" class="com.dongnao.demo.config.ZookeeperPlaceholderConfigurer"> <property name="zkCentralConfigurer" ref="zkCentralConfigurer" /> <property name="ignoreUnresolvablePlaceholders" value="true" /> <property name="order" value="1" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource"> <ref bean="dataSource" /> </property> </bean> <bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource" destroy-method="shutdown"> <property name="driverClassName" value="${driver}" /> <property name="jdbcUrl" value="${url}" /> <property name="username" value="${uname}" /> <property name="password" value="${upass}" /> <!-- 連線只讀資料庫時配置為true, 保證安全 --> <property name="readOnly" value="false" /> <!-- 等待連線池分配連線的最大時長(毫秒),超過這個時長還沒可用的連線則發生SQLException, 預設:30秒 --> <property name="connectionTimeout" value="30000" /> <!-- 一個連線idle狀態的最大時長(毫秒),超時則被釋放(retired),預設:10分鐘 --> <property name="idleTimeout" value="600000" /> <!-- 一個連線的生命時長(毫秒),超時而且沒被使用則被釋放(retired),預設:30分鐘,建議設定比資料庫超時時長少30秒,參考MySQL wait_timeout引數(show variables like '%timeout%';) --> <property name="maxLifetime" value="1800000" /> <!-- 連線池中允許的最大連線數。預設值:10;推薦的公式:((core_count * 2) + effective_spindle_count) --> <property name="maximumPoolSize" value="15" /> </bean> </beans>

ZookeeperCentralConfigurer .java

package com.dongnao.demo.config;

import java.util.List;
import java.util.Properties;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;

import com.zaxxer.hikari.HikariDataSource;

public class ZookeeperCentralConfigurer {
    private CuratorFramework zkClient;
    private TreeCache treeCache;

    private String zkServers;
    private String zkPath;
    private int sessionTimeout;
    private Properties props;

    public ZookeeperCentralConfigurer(String zkServers, String zkPath, int sessionTimeout) {
        this.zkServers = zkServers;
        this.zkPath = zkPath;
        this.sessionTimeout = sessionTimeout;
        this.props = new Properties();

        initZkClient();
        getConfigData();
        addZkListener();
    }

    private void initZkClient() {
        zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        zkClient.start();
    }

    private void getConfigData() {
        try {
            List<String> list = zkClient.getChildren().forPath(zkPath);
            for (String key : list) {
                String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
                if (value != null && value.length() > 0) {
                    props.put(key, value);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void addZkListener() {
        TreeCacheListener listener = new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
                    getConfigData();
                    WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
                    HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
                    System.out.println("================"+props.getProperty("url"));
                    dataSource.setJdbcUrl(props.getProperty("url"));
                }
            }
        };

        treeCache = new TreeCache(zkClient, zkPath);
        try {
            treeCache.start();
            treeCache.getListenable().addListener(listener);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Properties getProps() {
        return props;
    }

    public void setZkServers(String zkServers) {
        this.zkServers = zkServers;
    }

    public void setZkPath(String zkPath) {
        this.zkPath = zkPath;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }
}

ZookeeperPlaceholderConfigurer.java

package com.dongnao.demo.config;

import java.util.Properties;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;

public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
    private ZookeeperCentralConfigurer zkCentralConfigurer;

    @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
            throws BeansException {
        super.processProperties(beanFactoryToProcess, zkCentralConfigurer.getProps());
    }

    public void setZkCentralConfigurer(ZookeeperCentralConfigurer zkCentralConfigurer) {
        this.zkCentralConfigurer = zkCentralConfigurer;
    }
}

Zk客戶端命令 :

[zk: localhost:2181(CONNECTED) 2]create /jdbc db
Created /jdbc
[zk: localhost:2181(CONNECTED) 2] create /jdbc/url jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Created /jdbc/url
[zk: localhost:2181(CONNECTED) 3] create /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
Node already exists: /jdbc/url
[zk: localhost:2181(CONNECTED) 4] get /jdbc/url
jdbc:mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
cZxid = 0x3
ctime = Tue Mar 13 15:46:46 CST 2018
mZxid = 0x3
mtime = Tue Mar 13 15:46:46 CST 2018
pZxid = 0x3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0
[zk: localhost:2181(CONNECTED) 5] set /jdbc/url jdbc.mysql://localhost:3306/miaoshatest?useUnicode=true&characterEncoding=UTF-8
[zk: localhost:2181(CONNECTED) 11] create /jdbc/uname root
Created /jdbc/uname
[zk: localhost:2181(CONNECTED) 12]  create /jdbc/driver com.mysql.jdbc.Driver
Created /jdbc/driver
[zk: localhost:2181(CONNECTED) 13] create /jdbc/upass agui
Created /jdbc/upass

啟動tomcat即可。