SpringBoot+ZooKeeper+ZKUI+Drools 實現應用配置中心及業務規則動態載入
本文目的:
- 使用ZooKeeper作為SpringBoot應用的配置中心
- 應用中使用到的業務規則儲存在Zookeeper中,規則更新後在不重啟應用的情況下通知應用動態過載規則
1.zookeeper簡介
Zookeeper是一個高效能,分散式的,開源分散式應用協調服務。它提供了簡單原始的功能,分散式應用可以基於它實現更高階的服務,比如同步,配置管理,叢集管理,名稱空間。它被設計為易於程式設計,使用檔案系統目錄樹作為資料模型。服務端跑在java上,並且提供java和C的客戶端API。
資料模型:
特點:
- 採用樹形結構,每個節點叫Znode,節點路徑已/分隔,如:/zoo/foo,每個節點路徑必須唯一,且沒有相對路徑
- 每個Znode都可以儲存資料,資料型別byte[],都可以有子節點
- 每個Znode都有一個stat資料結構來儲存資料的版本,ACL及時間戳等
- 每個Znode都可以進行CRWD操作
由此可見,採用zookeeper可以很方便的管理應用的配置,如:名為cyzy-gpserver的springboot應用,可以將配置儲存在:
- /cyzy-gpserver ## 等同 application.yml
- /cyzy-gpserver,dev ## 等同 application-dev.yml
- /cyzy-gpserver,prod ## 等同 application-prod.yml
這些節點下,每個節點下再分別儲存相關配置項,如:
2.zookeeper及其ui客戶端安裝
zookeeper:
修改如下配置,zookeeper-3.4.6\conf\zoo.cfg:
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=D:\dev\zookeeper-3.4.6\datas
執行:
- windows:zookeeper-3.4.6\bin\zkServer.cmd
- linux:zookeeper-3.4.6\bin\zkServer.sh
ui客戶端zkui:
登陸使用者名稱/密碼配置,zkui\config.cfg:
userSet = {"users": [{ "username":"admin" , "password":"admin","role": "ADMIN" }
執行:
java -jar xxx.jar
3.與SpringBoot整合
pom:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-config</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
src\main\resources下的bootstrap.yml:
spring:
application:
name: cyzy-gpserver
profiles:
active: prod
cloud:
zookeeper:
enabled: true # true:開啟zookeeper外部化配置, false:讀取本地配置; 需要將config.enabled,config.watcher.enabled同時設定
connect-string: 127.0.0.1:2181
config:
enabled: true
watcher:
enabled: false
這樣預設情況下應用會去/config/application,prod 和 /config/cyzy-gpserver,prod節點下讀取配置
4.業務規則動態載入
基本思路:
1.規則儲存在zookeeper中,按照不同的Znode進行分組,如:某類規則儲存在/drools.rules/group1下, 某類儲存在/drools.rules/group2
2.每個儲存了規則的Znode下建立一個“狀態節點”和一個“結果反饋節點”,如:
/drools.rules/group1/a_push_node, /drools.rules/group1/a_result_node
3.應用使用curator客戶端的TreeCacheListener方式來監控/drools.rules下所有”狀態節點”的變化,根據其狀態值,進行相應操作。如:在/drools.rules/group1下修改了某個規則節點,此時設定a_push_node值為0,則應用過載/drools.rules/group1下的規則進行編譯測試;設定為1,應用過載規則。應用操作結果反饋回result_node。
4.應用在使用時可以按照Znode的路徑來fire rules,如:fireRules(String ruleInZKPath, String agendaGroup, AgendaFilter filter, Object… facts)
這樣就可以實現一個輕量級的簡易的規則管理和動態過載規則的系統……
整合Drools:
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>6.4.0.Final</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>6.4.0.Final</version>
</dependency>
動態載入規則服務:
/**
* 可動態載入Drools規則的服務,載入方式:
* 1.從本地檔案載入,通過http get:/app/drools/reload 或 /app/drools/test 過載規則
* 2.從遠端zookeeper節點上載入,通過設定節點下a_push_node值的方式過載規則:0,測試模式;非0,正式載入
* @ClassName: DroolsService
*/
@Service
@RestController
public class DroolsService{
private static final Logger LOG = LoggerFactory.getLogger(DroolsService.class);
private static final String GROUP_NAME = "com.cyzy";
private static final String VERSION = "1.0.0";
private static final String ZK_ENABLED = "spring.cloud.zookeeper.enabled";
private static final String ZK_PRIFEX_NODE_PATH = "/drools.rules.test";
private static final String ZK_NODE_TEST_VALUE = "0";
private static final String PRIFEX_DIR="drools.prefixDir";
private static final String TARGET_DIR="drools.targetDir";
private static final String RELOAD_RULES_OK = "reload drools's rules ok......";
@Autowired
private Environment env;
@Autowired
private ApplicationContext appCtx;
private CuratorFramework client;
private KieServices kieServices;
private KieRepository repository;
private ConcurrentHashMap<String, KieContainer> kieContainers = new ConcurrentHashMap<String, KieContainer>();
@PostConstruct
protected void initKieContainer() throws Exception {
kieServices = KieServices.Factory.get();
repository = kieServices.getRepository();
boolean isZKEnabled = env.getProperty(ZK_ENABLED, Boolean.class);
if (isZKEnabled) {
client = appCtx.getBean(CuratorFramework.class);
loadRulesFromZK();
} else {
loadConfigFromLocalFile();
}
}
/**
* 從ZK載入規則
* @Title: loadRulesFromZK
* @throws Exception
* @return: void
*/
private void loadRulesFromZK() throws Exception{
ZkNodeListenerAdapter.getInstance()
//設定watcher
.watcher(new ZKNodeWatcher(ZK_PRIFEX_NODE_PATH, client))
//預設處理器
.setDefaultHandler(new ZkNodeHandler() {
@Override
public String execute(CuratorFramework client, TreeCache tc, TreeCacheEvent event, ChildData targetChildData,
Map<String, ChildData> filterChildren) {
String msg = RELOAD_RULES_OK;
try {
String pathName = genPathName(event.getData().getPath());
loadRules(pathName, getRulesFromZKNodes(pathName, filterChildren));
} catch (Exception e) {
msg = e.getMessage();
LOG.error(msg);
}
return msg;
}
})
//a_push_node節點值為0時的處理器
.addHandler(ZK_NODE_TEST_VALUE, new ZkNodeHandler() {
@Override
public String execute(CuratorFramework client, TreeCache tc, TreeCacheEvent event, ChildData targetChildData,
Map<String, ChildData> filterChildren) {
String msg = RELOAD_RULES_OK;
try {
String pathName = genPathName(event.getData().getPath());
testRules(pathName, getRulesFromZKNodes(pathName, filterChildren));
} catch (Exception e) {
msg = e.getMessage();
LOG.error(msg);
}
return msg;
}
})
//增加此監聽器到watcher
.addToWatcher(ZK_PRIFEX_NODE_PATH+"/gps/zte")
.addToWatcher(ZK_PRIFEX_NODE_PATH+"/gps/zte2")
.addToWatcher(ZK_PRIFEX_NODE_PATH+"/group1")
.addToWatcher(ZK_PRIFEX_NODE_PATH+"/group2")
//啟動監聽
.start();
}
/**
* 從本地檔案載入規則
* @Title: loadConfigFromLocalFile
* @throws Exception
* @return: void
*/
private void loadConfigFromLocalFile() throws Exception{
String targetDir = env.getProperty(TARGET_DIR);
loadRules(targetDir, getRulesFromLocalFile(targetDir));
}
/**
* 正式載入規則檔案進行使用
*/
private void loadRules(String ruleInZKPath, List<ResourceWrapper> resourceWrappers) throws Exception {
// if failed throws Exception
ReleaseId releaseId = kieServices.newReleaseId(GROUP_NAME, ruleInZKPath, VERSION);
InternalKieModule kieModule = DroolsUtils.createKieModule(kieServices, releaseId, resourceWrappers);
// if succeed will add new module
repository.addKieModule(kieModule);
KieContainer kieContainer = kieServices.newKieContainer(releaseId);
kieContainer.updateToVersion(releaseId);
kieContainers.put(ruleInZKPath, kieContainer);
}
/**
* 測試載入規則檔案
*/
private void testRules(String ruleInZKPath, List<ResourceWrapper> resourceWrappers) throws Exception{
ReleaseId releaseId = kieServices.newReleaseId(GROUP_NAME, ruleInZKPath, VERSION);
DroolsUtils.createKieModule(kieServices, releaseId, resourceWrappers);
}
/**
* 從ENV配置的指定資料夾中獲取規則檔案
*/
private List<ResourceWrapper> getRulesFromLocalFile(String path) {
String prefixDir = env.getProperty(PRIFEX_DIR);
String targetDir = env.getProperty(TARGET_DIR);
List<ResourceWrapper> resourceWrappers = new ArrayList<ResourceWrapper>();
List<File> ruleFiles = new ArrayList<File>();
try {
ruleFiles = DroolsUtils.getDefaultRuleFiles(prefixDir, targetDir);
} catch (Exception e) {}
if(ruleFiles.isEmpty()){
throw new RuntimeException("can't load rules from " + prefixDir +"/"+ targetDir);
}
LOG.info("################################load ["+path+"] rules################################");
for (File file : ruleFiles) {
LOG.info(file.getName());
resourceWrappers.add(new ResourceWrapper(ResourceFactory.newFileResource(file), file.getName()));
}
LOG.info("################################load ["+path+"] rules################################");
return resourceWrappers;
}
/**
* 從ZK節點上獲取規則檔案
*/
private List<ResourceWrapper> getRulesFromZKNodes(String path, Map<String, ChildData> filterChildren) {
List<ResourceWrapper> resourceWrappers = new ArrayList<ResourceWrapper>();
LOG.info("################################load ["+path+"] rules################################");
for (Entry<String, ChildData> entry : filterChildren.entrySet()) {
LOG.info(entry.getKey());
resourceWrappers.add(
new ResourceWrapper(ResourceFactory.newByteArrayResource(entry.getValue().getData()),entry.getKey()));
}
LOG.info("################################load ["+path+"] rules################################");
return resourceWrappers;
}
private String genPathName(String zkNodePath) {
String name = "";
Matcher matcher = Pattern.compile("(.*?)/a_push_node").matcher(zkNodePath);
if (matcher.matches()) {
name = matcher.group(1);
}
return name;
}
/**
* 建立指定ZK節點路徑下的KieSession
* @Title: newSession
* @Description: TODO
* @param ruleInZKPath
* @return: KieSession
*/
private KieSession newSession(String ruleInZKPath) {
KieContainer kieContainer = kieContainers.get(ruleInZKPath);
if(kieContainer==null){
throw new RuntimeException("can't get KieContainer with the name:" + ruleInZKPath);
}
KieSession session = kieContainer.newKieSession();
// 預設配置
session.setGlobal("appCtx", appCtx);
return session;
}
/**
* 執行指定ZK節點路徑下的所有“MAIN”議程組(使用agenda-group定義,預設是MAIN)的規則
* @Title: fireAllRules
* @param ruleInZKPath
* @param facts
* @return: void
*/
public void fireMainGroupRules(String ruleInZKPath, Object... facts) {
fireRules(ruleInZKPath, null, null, facts);
}
/**
* 執行指定ZK節點路徑下的所有“MAIN”議程組(使用agenda-group定義,預設是MAIN)的並且經過AgendaFilter過濾的規則
* @Title: fireAllRules
* @param ruleInZKPath
* @param filter
* @param facts
* @return: void
*/
public void fireMainGroupRules(String ruleInZKPath, AgendaFilter filter, Object... facts) {
fireRules(ruleInZKPath, null, filter, facts);
}
/**
* 執行指定ZK節點路徑下的所有agendaGroup指定議程組和“MAIN”議程組(使用agenda-group定義,預設是MAIN)的並且經過AgendaFilter過濾的規則
* @Title: fireAllRules
* @param ruleInZKPath 規則所在ZK node的路徑
* @param agendaGroup 規則所在議程組名稱,如果不定義預設:MAIN
* @param filter 規則過濾器
* @param facts 事實
* @return: void
*/
public void fireRules(String ruleInZKPath, String agendaGroup, AgendaFilter filter, Object... facts) {
KieSession session = null;
try {
session = newSession(ruleInZKPath);
// add fact
for (Object fact : facts) {
session.insert(fact);
}
//focus agenda group
if (agendaGroup != null && !agendaGroup.isEmpty()) {
session.getAgenda().getAgendaGroup(agendaGroup).setFocus();
}
// add filter
if (filter != null) {
session.fireAllRules(filter);
} else {
session.fireAllRules();
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
} finally {
if (session != null) {
session.dispose();
}
}
}
@RequestMapping(value = "/app/drools/reload", method = RequestMethod.GET)
public @ResponseBody ResponseEntity<String> reloadRules() {
String msg = RELOAD_RULES_OK;
try {
String targetDir = env.getProperty(TARGET_DIR);
loadRules(targetDir, getRulesFromLocalFile(targetDir));
} catch (Exception e) {
msg = e.getMessage();
}
return ResponseEntity.ok().body(msg);
}
@RequestMapping(value = "/app/drools/test", method = RequestMethod.GET)
public @ResponseBody ResponseEntity<String> testRules() {
String msg = RELOAD_RULES_OK;
try {
String targetDir = env.getProperty(TARGET_DIR);
testRules(targetDir, getRulesFromLocalFile(targetDir));
} catch (Exception e) {
msg = e.getMessage();
}
return ResponseEntity.ok().body(msg);
}
@RequestMapping(value = "/app/drools/test2", method = RequestMethod.GET)
public @ResponseBody ResponseEntity<String> test2() {
String msg = "test ok";
try {
fireRules(ZK_PRIFEX_NODE_PATH + "/group1", "group1", null, new Object());
fireRules(ZK_PRIFEX_NODE_PATH + "/group2", "", null, new Object());
fireRules(ZK_PRIFEX_NODE_PATH + "/gps/zte", "gps.zte", null, new Object());
fireRules(ZK_PRIFEX_NODE_PATH + "/gps/zte2", "gps.zte", null, new Object());
// fireAllRules("rules", null, null, new Object());
} catch (Exception e) {
msg = e.getMessage();
}
return ResponseEntity.ok().body(msg);
}
}