canal原始碼分析簡介-1
1.0 canal原始碼分析簡介
canal是阿里巴巴開源的mysql資料庫binlog的增量訂閱&消費元件。專案github地址為:https://github.com/alibaba/canal。
本教程是從原始碼的角度來分析canal,適用於對canal有一定基礎的同學。本教程使用的版本是1.0.24,這也是筆者寫這篇教程時的最新穩定版,關於canal的基礎知識可以參考:https://github.com/alibaba/canal/wiki。
下載專案原始碼
下載
- gitclonehttps://github.com/alibaba/canal.git
切換到canal-1.0.24這個tag
- gitcheckoutcanal-1.0.24
原始碼模組劃分
canal是基於maven構建的,總共分成了14個模組,如下所示:
模組雖多,但是每個模組的程式碼都很少。各個模組的作用如下所示:
common模組:主要是提供了一些公共的工具類和介面。
client模組:canal的客戶端。核心介面為CanalConnector
example模組:提供client模組使用案例。
protocol模組:client和server模組之間的通訊協議
deployer:部署模組。通過該模組提供的CanalLauncher來啟動canal server
server模組:canal伺服器端。核心介面為CanalServer
instance模組:一個server有多個instance。每個instance都會模擬成一個mysql例項的slave。instance模組有四個核心組成部分:parser模組、sink模組、store模組,meta模組。核心介面為CanalInstance
parser模組:資料來源接入,模擬slave協議和master進行互動,協議解析。parser模組依賴於dbsync、driver模組。
driver模組和dbsync模組:從這兩個模組的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出來,這兩個模組實際上是parser模組的元件。事實上parser 是通過driver模組與mysql建立連線,從而獲取到binlog。由於原始的binlog都是二進位制流,需要解析成對應的binlog事件,這些binlog事件物件都定義在dbsync模組中,dbsync 模組來自於淘寶的tddl。
sink模組:parser和store連結器,進行資料過濾,加工,分發的工作。核心介面為CanalEventSink
store模組:資料儲存。核心介面為CanalEventStore
meta模組:增量訂閱&消費資訊管理器,核心介面為CanalMetaManager,主要用於記錄canal消費到的mysql binlog的位置,
下面再通過一張圖來說明各個模組之間的依賴關係
通過deployer模組,啟動一個canal-server,一個cannal-server內部包含多個instance,每個instance都會偽裝成一個mysql例項的slave。client與server之間的通訊協議由protocol模組定義。client在訂閱binlog資訊時,需要傳遞一個destination引數,server會根據這個destination確定由哪一個instance為其提供服務。
在分析原始碼的時候,本人也是按照模組來劃分的,基本上一個模組對應一篇文章。
2.0 deployer模組
canal有兩種使用方式:1、獨立部署 2、內嵌到應用中。 deployer模組主要用於獨立部署canal server。關於這兩種方式的區別,請參見server模組原始碼分析。deployer模組原始碼目錄結構如下所示:
在獨立部署canal時,需要首先對canal的原始碼進行打包
- mvncleaninstall-Dmaven.test.skip-Denv=release
以本教程使用1.0.24版本為例,打包後會在target目錄生成一個以下兩個檔案:
其中canal.deployer-1.0.24.tar.gz就是canal的獨立部署包。解壓縮後,目錄如下所示。其中bin目錄和conf目錄(包括子目錄spring)中的所有檔案,都來自於deployer模組。
- canal
- ├──bin
- │├──startup.bat
- │├──startup.sh
- │└──stop.sh
- ├──conf
- │├──canal.properties
- │├──example
- ││└──instance.properties
- │├──logback.xml
- │└──spring
- │├──default-instance.xml
- │├──file-instance.xml
- │├──group-instance.xml
- │├──local-instance.xml
- │└──memory-instance.xml
- ├──lib
- │└──....依賴的各種jar
- └──logs
deployer模組主要完成以下功能:
1、讀取canal,properties配置檔案
2、啟動canal server,監聽canal client的請求
3、啟動canal instance,連線mysql資料庫,偽裝成slave,解析binlog
4、在canal的執行過程中,監聽配置檔案的變化
1、啟動和停止指令碼
bin目錄中包含了canal的啟動和停止指令碼startup.sh
和stop.sh
,當我們要啟動canal時,只需要輸入以下命令即可
- shbin/startup.sh
在windows環境下,可以直接雙擊startup.bat。
在startup.sh指令碼內,會呼叫com.alibaba.otter.canal.deployer.CanalLauncher類來進行啟動,這是分析Canal原始碼的入口類,如下圖所示:
同時,startup.sh還會在bin目錄下生成一個canal.pid
檔案,用於儲存canal的程序id。當停止canal的時候
- shbin/stop.sh
會根據canal.pid檔案中記錄的程序id,kill掉canal程序,並且刪除這個檔案。
2、CannalLauncher
CanalLauncher
是整個原始碼分析的入口類,程式碼相當簡單。步驟是:
1、讀取canal.properties檔案中的配置
2、利用讀取的配置構造一個CanalController例項,將所有的啟動操作都委派給CanalController進行處理。
3、最後註冊一個鉤子函式,在JVM停止時同時也停止canal server。
com.alibaba.otter.canal.deployer.CanalLauncher
- publicclassCanalLauncher{
- privatestaticfinalStringCLASSPATH_URL_PREFIX="classpath:";
- privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CanalLauncher.class);
- publicstaticvoidmain(String[]args)throwsThrowable{
- try{
- //1、讀取canal.properties檔案中配置,預設讀取classpath下的canal.properties
- Stringconf=System.getProperty("canal.conf","classpath:canal.properties");
- Propertiesproperties=newProperties();
- if(conf.startsWith(CLASSPATH_URL_PREFIX)){
- conf=StringUtils.substringAfter(conf,CLASSPATH_URL_PREFIX);
- properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
- }else{
- properties.load(newFileInputStream(conf));
- }
- //2、啟動canal,首先將properties物件傳遞給CanalController,然後呼叫其start方法啟動
- logger.info("##startthecanalserver.");
- finalCanalControllercontroller=newCanalController(properties);
- controller.start();
- logger.info("##thecanalserverisrunningnow......");
- //3、關閉canal,通過新增JVM的鉤子,JVM停止前會回撥run方法,其內部呼叫controller.stop()方法進行停止
- Runtime.getRuntime().addShutdownHook(newThread(){
- publicvoidrun(){
- try{
- logger.info("##stopthecanalserver");
- controller.stop();
- }catch(Throwablee){
- logger.warn("##somethinggoeswrongwhenstoppingcanalServer:\n{}",
- ExceptionUtils.getFullStackTrace(e));
- }finally{
- logger.info("##canalserverisdown.");
- }
- }
- });
- }catch(Throwablee){
- logger.error("##SomethinggoeswrongwhenstartingupthecanalServer:\n{}",
- ExceptionUtils.getFullStackTrace(e));
- System.exit(0);
- }
- }
- }
可以看到,CanalLauncher實際上只是負責讀取canal.properties配置檔案,然後構造CanalController物件,並通過其start和stop方法來開啟和停止canal。因此,如果說CanalLauncher是canal原始碼分析的入口類,那麼CanalController就是canal原始碼分析的核心類。
3、CanalController
在CanalController的構造方法中,會對配置檔案內容解析,初始化相關成員變數,做好canal server的啟動前的準備工作,之後在CanalLauncher中呼叫CanalController.start方法來啟動。
CanalController中定義的相關欄位和構造方法,如下所示:
- publicclassCanalController{
- privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CanalController.class);
- privateLongcid;
- privateStringip;
- privateintport;
- //預設使用spring的方式載入
- privateMap<String,InstanceConfig>instanceConfigs;
- privateInstanceConfigglobalInstanceConfig;
- privateMap<String,CanalConfigClient>managerClients;
- //監聽instanceconfig的變化
- privatebooleanautoScan=true;
- privateInstanceActiondefaultAction;
- privateMap<InstanceMode,InstanceConfigMonitor>instanceConfigMonitors;
- privateCanalServerWithEmbeddedembededCanalServer;
- privateCanalServerWithNettycanalServer;
- privateCanalInstanceGeneratorinstanceGenerator;
- privateZkClientxzkclientx;
- publicCanalController(){
- this(System.getProperties());
- }
- publicCanalController(finalPropertiesproperties){
- managerClients=MigrateMap.makeComputingMap(newFunction<String,CanalConfigClient>(){
- publicCanalConfigClientapply(StringmanagerAddress){
- returngetManagerClient(managerAddress);
- }
- });
- //1、配置解析
- globalInstanceConfig=initGlobalConfig(properties);
- instanceConfigs=newMapMaker().makeMap();
- initInstanceConfig(properties);
- //2、準備canalserver
- cid=Long.valueOf(getProperty(properties,CanalConstants.CANAL_ID));
- ip=getProperty(properties,CanalConstants.CANAL_IP);
- port=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_PORT));
- embededCanalServer=CanalServerWithEmbedded.instance();
- embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//設定自定義的instanceGenerator
- canalServer=CanalServerWithNetty.instance();
- canalServer.setIp(ip);
- canalServer.setPort(port);
- //3、初始化zk相關程式碼
- //處理下ip為空,預設使用hostIp暴露到zk中
- if(StringUtils.isEmpty(ip)){
- ip=AddressUtils.getHostIp();
- }
- finalStringzkServers=getProperty(properties,CanalConstants.CANAL_ZKSERVERS);
- if(StringUtils.isNotEmpty(zkServers)){
- zkclientx=ZkClientx.getZkClient(zkServers);
- //初始化系統目錄
- zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE,true);
- zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE,true);
- }
- //4CanalInstance執行狀態監控
- finalServerRunningDataserverData=newServerRunningData(cid,ip+":"+port);
- ServerRunningMonitors.setServerData(serverData);
- ServerRunningMonitors.setRunningMonitors(//...);
- //5、autoScan機制相關程式碼
- autoScan=BooleanUtils.toBoolean(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN));
- if(autoScan){
- defaultAction=newInstanceAction(){//....};
- instanceConfigMonitors=//....
- }
- }
- ....
- }
為了讀者能夠儘量容易的看出CanalController的構造方法中都做了什麼,上面程式碼片段中省略了部分程式碼。這樣,我們可以很明顯的看出來, ,在CanalController構造方法中的程式碼分劃分為了固定的幾個處理步驟,下面按照幾個步驟的劃分,逐一進行講解,並詳細的介紹CanalController中定義的各個欄位的作用。
3.1 配置解析相關程式碼
- //初始化全域性引數設定
- globalInstanceConfig=initGlobalConfig(properties);
- instanceConfigs=newMapMaker().makeMap();
- //初始化instanceconfig
- initInstanceConfig(properties);
3.1.1 globalInstanceConfig欄位
表示canal instance的全域性配置,型別為InstanceConfig,通過initGlobalConfig方法進行初始化。主要用於解析canal.properties
以下幾個配置項:
-
canal.instance.global.mode:確定canal instance配置載入方式,取值有manager|spring兩種方式
-
canal.instance.global.lazy:確定canal instance是否延遲初始化
-
canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置項
-
canal.instance.global.spring.xml:spring配置檔案路徑。如果canal.instance.global.mode=spring,需要提供此配置項
initGlobalConfig原始碼如下所示:
- privateInstanceConfiginitGlobalConfig(Propertiesproperties){
- InstanceConfigglobalConfig=newInstanceConfig();
- //讀取canal.instance.global.mode
- StringmodeStr=getProperty(properties,CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
- if(StringUtils.isNotEmpty(modeStr)){
- //將modelStr轉成列舉InstanceMode,這是一個列舉類,只有2個取值,SPRING\MANAGER,對應兩種配置方式
- globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
- }
- //讀取canal.instance.global.lazy
- StringlazyStr=getProperty(properties,CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
- if(StringUtils.isNotEmpty(lazyStr)){
- globalConfig.setLazy(Boolean.valueOf(lazyStr));
- }
- //讀取canal.instance.global.manager.address
- StringmanagerAddress=getProperty(properties,
- CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
- if(StringUtils.isNotEmpty(managerAddress)){
- globalConfig.setManagerAddress(managerAddress);
- }
- //讀取canal.instance.global.spring.xml
- StringspringXml=getProperty(properties,CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
- if(StringUtils.isNotEmpty(springXml)){
- globalConfig.setSpringXml(springXml);
- }
- instanceGenerator=//...初始化instanceGenerator
- returnglobalConfig;
- }
其中canal.instance.global.mode
用於確定canal instance的全域性配置載入方式,其取值範圍有2個:spring
、manager
。我們知道一個canal server中可以啟動多個canal instance,每個instance都有各自的配置。instance的配置也可以放在本地,也可以放在遠端配置中心裡。我們可以自定義每個canal instance配置檔案儲存的位置,如果所有canal instance的配置都在本地或者遠端,此時我們就可以通過canal.instance.global.mode這個配置項,來統一的指定配置檔案的位置,避免為每個canal instance單獨指定。
其中:
spring方式:
表示所有的canal instance的配置檔案位於本地。此時,我們必須提供配置項canal.instance.global.spring.xml指定spring配置檔案的路徑。canal提供了多個spring配置檔案:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。這麼多配置檔案主要是為了支援canal instance不同的工作方式。我們在稍後將會講解各個配置檔案的區別。而在這些配置檔案的開頭,我們無一例外的可以看到以下配置:
- <beanclass="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer"lazy-init="false">
- <propertyname="ignoreResourceNotFound"value="true"/>
- <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!--允許system覆蓋-->
- <propertyname="locationNames">
- <list>
- <value>classpath:canal.properties</value>
- <value>classpath:${canal.instance.destination:}/instance.properties</value>
- </list>
- </property>
- </bean>
這裡我們可以看到,所謂通過spring方式載入canal instance配置,無非就是通過spring提供的PropertyPlaceholderConfigurer來載入canal instance的配置檔案instance.properties。
這裡instance.properties的檔案完整路徑是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一個變數。這是因為我們可以在一個canal server中配置多個canal instance,每個canal instance配置檔案的名稱都是instance.properties,因此我們需要通過目錄進行區分。例如我們通過配置項canal.destinations指定多個canal instance的名字
- canal.destinations=example1,example2
此時我們就要conf目錄下,新建兩個子目錄example1和example2,每個目錄下各自放置一個instance.properties。
canal在初始化時就會分別使用example1和example2來替換${canal.instance.destination:},從而分別根據example1/instance.properties和example2/instance.properties建立2個canal instance。
manager方式:
表示所有的canal instance的配置檔案位於遠端配置中心,此時我們必須提供配置項 canal.instance.global.manager.address來指定遠端配置中心的地址。目前alibaba內部配置使用這種方式。開發者可以自己實現CanalConfigClient,連線各自的管理系統,完成接入。
3.1.2 instanceGenerator欄位
型別為CanalInstanceGenerator
。在initGlobalConfig方法中,除了建立了globalInstanceConfig例項,同時還為欄位instanceGenerator欄位進行了賦值。
顧名思義,這個欄位用於建立CanalInstance
例項。這是instance模組中的類,其作用就是為canal.properties檔案中canal.destinations
配置項列出的每個destination,建立一個CanalInstance例項。CanalInstanceGenerator是一個介面,定義如下所示:
- publicinterfaceCanalInstanceGenerator{
- /**
- *通過destination產生特定的{@linkCanalInstance}
- */
- CanalInstancegenerate(Stringdestination);
- }
針對spring和manager兩種instance配置的載入方式,CanalInstanceGenerator提供了兩個對應的實現類,如下所示:
instanceGenerator欄位通過一個匿名內部類進行初始化。其內部會判斷配置的各個destination的配置載入方式,spring 或者manager。
- instanceGenerator=newCanalInstanceGenerator(){
- publicCanalInstancegenerate(Stringdestination){
- //1、根據destination從instanceConfigs獲取對應的InstanceConfig物件
- InstanceConfigconfig=instanceConfigs.get(destination);
- if(config==null){
- thrownewCanalServerException("can'tfinddestination:{}");
- }
- //2、如果destination對應的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator
- if(config.getMode().isManager()){
- ManagerCanalInstanceGeneratorinstanceGenerator=newManagerCanalInstanceGenerator();
- instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
- returninstanceGenerator.generate(destination);
- }elseif(config.getMode().isSpring()){
- //3、如果destination對應的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator
- SpringCanalInstanceGeneratorinstanceGenerator=newSpringCanalInstanceGenerator();
- synchronized(this){
- try{
- //設定當前正在載入的通道,載入spring查詢檔案時會用到該變數
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY,destination);
- instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
- returninstanceGenerator.generate(destination);
- }catch(Throwablee){
- logger.error("generatorinstancefailed.",e);
- thrownewCanalException(e);
- }finally{
- System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY,"");
- }
- }
- }else{
- thrownewUnsupportedOperationException("unknowmode:"+config.getMode());
- }
- }
- };
上述程式碼中的第1步比較變態,從instanceConfigs中根據destination作為引數,獲得對應的InstanceConfig。而instanceConfigs目前還沒有被初始化,這個欄位是在稍後將後將要講解的initInstanceConfig方法初始化的,不過由於這是一個引用型別,當initInstanceConfig方法被執行後,instanceConfigs欄位中也就有值了。目前,我們姑且認為, instanceConfigs這個Map<String, InstanceConfig>型別的欄位已經被初始化好了。
2、3兩步用於確定是instance的配置載入方式是spring還是manager,如果是spring,就使用SpringCanalInstanceGenerator建立CanalInstance例項,如果是manager,就使用ManagerCanalInstanceGenerator建立CanalInstance例項。
由於目前manager方式的原始碼並未開源,因此,我們只分析SpringCanalInstanceGenerator相關程式碼。
上述程式碼中,首先建立了一個SpringCanalInstanceGenerator例項,然後往裡面設定了一個BeanFactory。
- instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
其中config.getSpringXml()返回的就是我們在canal.properties中通過canal.instance.global.spring.xml配置項指定了spring配置檔案路徑。getBeanFactory方法原始碼如下所示:
- privateBeanFactorygetBeanFactory(StringspringXml){
- ApplicationContextapplicationContext=newClassPathXmlApplicationContext(springXml);
- returnapplicationContext;
- }
往SpringCanalInstanceGenerator
設定了BeanFactory之後,就可以通過其的generate方法獲得CanalInstance例項。
SpringCanalInstanceGenerator的原始碼如下所示:
- publicclassSpringCanalInstanceGeneratorimplementsCanalInstanceGenerator,BeanFactoryAware{
- privateStringdefaultName="instance";
- privateBeanFactorybeanFactory;
- publicCanalInstancegenerate(Stringdestination){
- StringbeanName=destination;
- //首先判斷beanFactory是否包含以destination為id的bean
- if(!beanFactory.containsBean(beanName)){
- beanName=defaultName;//如果沒有,設定要獲取的bean的id為instance。
- }
- //以預設的bean的id值"instance"來獲取CanalInstance例項
- return(CanalInstance)beanFactory.getBean(beanName);
- }
- publicvoidsetBeanFactory(BeanFactorybeanFactory)throwsBeansException{
- this.beanFactory=beanFactory;
- }
- }
首先嚐試以傳入的引數destination來獲取CanalInstance例項,如果沒有,就以預設的bean的id值"instance"來獲取CanalInstance例項。事實上,如果你沒有修改spring配置檔案,那麼預設的名字就是instance。事實上,在canal提供的各個spring配置檔案xxx-instance.xml中,都有類似以下配置:
- <beanid="instance"class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
- <propertyname="destination"value="${canal.instance.destination}"/>
- <propertyname="eventParser">
- <reflocal="eventParser"/>
- </property>
- <propertyname="eventSink">
- <reflocal="eventSink"/>
- </property>
- <propertyname="eventStore">
- <reflocal="eventStore"/>
- </property>
- <propertyname="metaManager">
- <reflocal="metaManager"/>
- </property>
- <propertyname="alarmHandler">
- <reflocal="alarmHandler"/>
- </property>
- </bean>
上面的程式碼片段中,我們看到的確有一個bean的名字是instance,其型別是CanalInstanceWithSpring
,這是CanalInstance介面的實現類。類似的,我們可以想到在manager配置方式下,獲取的CanalInstance實現類是CanalInstanceWithManager
。事實上,你想的沒錯,CanalInstance的類圖繼承關係如下所示:
需要注意的是,到目前為止,我們只是建立好了CanalInstanceGenerator,而CanalInstance尚未建立。在CanalController的start方法被呼叫時,CanalInstance才會被真正的建立,相關原始碼將在稍後分析。
3.1.3 instanceConfigs欄位
型別為Map<String, InstanceConfig>。前面提到初始化instanceGenerator後,當其generate方法被呼叫時,會嘗試從instanceConfigs根據一個destination獲取對應的InstanceConfig
,現在分析instanceConfigs的相關初始化程式碼。
我們知道globalInstanceConfig定義全域性的配置載入方式。如果需要把部分CanalInstance配置放於本地,另外一部分CanalIntance配置放於遠端配置中心,則只通過全域性方式配置,無法達到這個要求。雖然這種情況很少見,但是為了提供最大的靈活性,canal支援每個CanalIntance自己來定義自己的載入方式,來覆蓋預設的全域性配置載入方式。而每個destination對應的InstanceConfig配置就存放於instanceConfigs欄位中。
舉例來說:
- //當前server上部署的instance列表
- canal.destinations=instance1,instance2
- //instance配置全域性載入方式
- canal.instance.global.mode=spring
- canal.instance.global.lazy=false
- canal.instance.global.spring.xml=classpath:spring/file-instance.xml
- //instance1覆蓋全域性載入方式
- canal.instance.instance1.mode=manager
- canal.instance.instance1.manager.address=127.0.0.1:1099
- canal.instance.instance1.lazy=tue
這段配置中,設定了instance的全域性載入方式為spring,instance1覆蓋了全域性配置,使用manager方式載入配置。而instance2沒有覆蓋配置,因此預設使用spring載入方式。
instanceConfigs欄位通過initInstanceConfig方法進行初始化
- instanceConfigs=newMapMaker().makeMap();//這裡利用GoogleGuava框架的MapMaker建立Map例項並賦值給instanceConfigs
- //初始化instanceconfig
- initInstanceConfig(properties);
initInstanceConfig方法原始碼如下:
- privatevoidinitInstanceConfig(Propertiesproperties){
- //讀取配置項canal.destinations
- StringdestinationStr=getProperty(properties,CanalConstants.CANAL_DESTINATIONS);
- //以","分割canal.destinations,得到一個數組形式的destination
- String[]destinations=StringUtils.split(destinationStr,CanalConstants.CANAL_DESTINATION_SPLIT);
- for(Stringdestination:destinations){
- //為每一個destination生成一個InstanceConfig例項
- InstanceConfigconfig=parseInstanceConfig(properties,destination);
- //將destination對應的InstanceConfig放入instanceConfigs中
- InstanceConfigoldConfig=instanceConfigs.put(destination,config);
- if(oldConfig!=null){
- logger.warn("destination:{}oldconfig:{}hasreplacebynewconfig:{}",newObject[]{destination,
- oldConfig,config});
- }
- }
- }
上面程式碼片段中,首先解析canal.destinations配置項,可以理解一個destination就對應要初始化一個canal instance。針對每個destination會建立各自的InstanceConfig,最終都會放到instanceConfigs這個Map中。
各個destination對應的InstanceConfig都是通過parseInstanceConfig方法來解析
- privateInstanceConfigparseInstanceConfig(Propertiesproperties,Stringdestination){
- //每個destination對應的InstanceConfig都引用了全域性的globalInstanceConfig
- InstanceConfigconfig=newInstanceConfig(globalInstanceConfig);
- //...其他幾個配置項與獲取globalInstanceConfig類似,不再贅述,唯一注意的的是配置項的key部分中的global變成傳遞進來的destination
- returnconfig;
- }
此時我們可以看一下InstanceConfig類的原始碼:
- publicclassInstanceConfig{
- privateInstanceConfigglobalConfig;
- privateInstanceModemode;
- privateBooleanlazy;
- privateStringmanagerAddress;
- privateStringspringXml;
- publicInstanceConfig(){
- }
- publicInstanceConfig(InstanceConfigglobalConfig){
- this.globalConfig=globalConfig;
- }
- publicstaticenumInstanceMode{
- SPRING,MANAGER;
- publicbooleanisSpring(){
- returnthis==InstanceMode.SPRING;
- }
- publicbooleanisManager(){
- returnthis==InstanceMode.MANAGER;
- }
- }
- publicBooleangetLazy(){
- if(lazy==null&&globalConfig!=null){
- returnglobalConfig.getLazy();
- }else{
- returnlazy;
- }
- }
- publicvoidsetLazy(Booleanlazy){
- this.lazy=lazy;
- }
- publicInstanceModegetMode(){
- if(mode==null&&globalConfig!=null){
- returnglobalConfig.getMode();
- }else{
- returnmode;
- }
- }
- publicvoidsetMode(InstanceModemode){
- this.mode=mode;
- }
- publicStringgetManagerAddress(){
- if(managerAddress==null&&globalConfig!=null){
- returnglobalConfig.getManagerAddress();
- }else{
- returnmanagerAddress;
- }
- }
- publicvoidsetManagerAddress(StringmanagerAddress){
- this.managerAddress=managerAddress;
- }
- publicStringgetSpringXml(){
- if(springXml==null&&globalConfig!=null){
- returnglobalConfig.getSpringXml();
- }else{
- returnspringXml;
- }
- }
- publicvoidsetSpringXml(StringspringXml){
- this.springXml=springXml;
- }
- publicStringtoString(){
- returnToStringBuilder.reflectionToString(this,CanalToStringStyle.DEFAULT_STYLE);
- }
- }
可以看到,InstanceConfig類中維護了一個globalConfig欄位,其型別也是InstanceConfig。而其相關get方法在執行時,會按照以下邏輯進行判斷:如果沒有自身沒有這個配置,則返回全域性配置,如果有,則返回自身的配置。通過這種方式實現對全域性配置的覆蓋。
3.2 準備canal server相關程式碼
- cid=Long.valueOf(getProperty(properties,CanalConstants.CANAL_ID));
- ip=getProperty(properties,CanalConstants.CANAL_IP);
- port=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_PORT));
- embededCanalServer=CanalServerWithEmbedded.instance();
- embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//設定自定義的instanceGenerator
- canalServer=CanalServerWithNetty.instance();
- canalServer.setIp(ip);
- canalServer.setPort(port);
上述程式碼中,首先解析了cid、ip、port欄位,其中:
cid:Long,對應canal.properties檔案中的canal.id,目前無實際用途
ip:String,對應canal.properties檔案中的canal.ip,canal server監聽的ip。
port:int,對應canal.properties檔案中的canal.port,canal server監聽的埠
之後分別為以下兩個欄位賦值:
embededCanalServer:型別為CanalServerWithEmbedded
canalServer:型別為CanalServerWithNetty
CanalServerWithEmbedded
和CanalServerWithNetty
都實現了CanalServer介面,且都實現了單例模式,通過靜態方法instance獲取例項。
關於這兩種型別的實現,canal官方文件有以下描述:
說白了,就是我們可以不必獨立部署canal server。在應用直接使用CanalServerWithEmbedded直連mysql資料庫。如果覺得自己的技術hold不住相關程式碼,就獨立部署一個canal server,使用canal提供的客戶端,連線canal server獲取binlog解析後資料。而CanalServerWithNetty是在CanalServerWithEmbedded的基礎上做的一層封裝,用於與客戶端通訊。
在獨立部署canal server時,Canal客戶端傳送的所有請求都交給CanalServerWithNetty處理解析,解析完成之後委派給了交給CanalServerWithEmbedded進行處理。因此CanalServerWithNetty就是一個馬甲而已。CanalServerWithEmbedded才是核心。
因此,在上述程式碼中,我們看到,用於生成CanalInstance例項的instanceGenerator被設定到了CanalServerWithEmbedded中,而ip和port被設定到CanalServerWithNetty中。
關於CanalServerWithNetty如何將客戶端的請求委派給CanalServerWithEmbedded進行處理,我們將在server模組原始碼分析中進行講解。
3.3 初始化zk相關程式碼
- //讀取canal.properties中的配置項canal.zkServers,如果沒有這個配置,則表示專案不使用zk
- finalStringzkServers=getProperty(properties,CanalConstants.CANAL_ZKSERVERS);
- if(StringUtils.isNotEmpty(zkServers)){
- //建立zk例項
- zkclientx=ZkClientx.getZkClient(zkServers);
- //初始化系統目錄
- //destination列表,路徑為/otter/canal/destinations
- zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE,true);
- //整個canalserver的叢集列表,路徑為/otter/canal/cluster
- zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE,true);
- }
canal支援利用了zk來完成HA機制、以及將當前消費到到的mysql的binlog位置記錄到zk中。ZkClientx是canal對ZkClient進行了一層簡單的封裝。
顯然,當我們沒有配置canal.zkServers,那麼zkclientx不會被初始化。
關於Canal如何利用ZK做HA,我們將在稍後的程式碼中進行分。而利用zk記錄binlog的消費進度,將在之後的章節進行分析。
3.4 CanalInstance執行狀態監控相關程式碼
由於這段程式碼比較長且噁心,這裡筆者暫時對部分程式碼進行省略,以便讀者看清楚整各脈絡
- finalServerRunningDataserverData=newServerRunningData(cid,ip+":"+port);
- ServerRunningMonitors.setServerData(serverData);
- ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(newFunction<String,ServerRunningMonitor>(){
- publicServerRunningMonitorapply(finalStringdestination){
- ServerRunningMonitorrunningMonitor=newServerRunningMonitor(serverData);
- runningMonitor.setDestination(destination);
- runningMonitor.setListener(newServerRunningListener(){....});//省略ServerRunningListener的具體實現
- if(zkclientx!=null){
- runningMonitor.setZkClient(zkclientx);
- }
- //觸發建立一下cid節點
- runningMonitor.init();
- returnrunningMonitor;
- }
- }));
上述程式碼中,ServerRunningMonitors
是ServerRunningMonitor物件的容器,而ServerRunningMonitor
用於監控CanalInstance。
canal會為每一個destination建立一個CanalInstance,每個CanalInstance都會由一個ServerRunningMonitor來進行監控。而ServerRunningMonitor統一由ServerRunningMonitors進行管理。
除了CanalInstance需要監控,CanalServer本身也需要監控。因此我們在程式碼一開始,就看到往ServerRunningMonitors設定了一個ServerRunningData物件,封裝了canal server監聽的ip和埠等資訊。
ServerRunningMonitors原始碼如下所示:
- publicclassServerRunningMonitors{
- privatestaticServerRunningDataserverData;
- privatestaticMaprunningMonitors;//<String,ServerRunningMonitor>
- publicstaticServerRunningDatagetServerData(){
- returnserverData;
- }
- publicstaticMap<String,ServerRunningMonitor>getRunningMonitors(){
- returnrunningMonitors;
- }
- publicstaticServerRunningMonitorgetRunningMonitor(Stringdestination){
- return(ServerRunningMonitor)runningMonitors.get(destination);
- }
- publicstaticvoidsetServerData(ServerRunningDataserverData){
- ServerRunningMonitors.serverData=serverData;
- }
- publicstaticvoidsetRunningMonitors(MaprunningMonitors){
- ServerRunningMonitors.runningMonitors=runningMonitors;
- }
- }
ServerRunningMonitors的setRunningMonitors方法接收的引數是一個Map,其中Map的key是destination,value是ServerRunningMonitor,也就是說針對每一個destination都有一個ServerRunningMonitor來監控。
上述程式碼中,在往ServerRunningMonitors設定Map時,是通過MigrateMap.makeComputingMap方法來建立的,其接受一個Function型別的引數,這是guava中定義的介面,其聲明瞭apply抽象方法。其工作原理可以通過下面程式碼片段進行介紹:
- Map<String,User>map=MigrateMap.makeComputingMap(newFunction<String,User>(){
- @Override
- publicUserapply(Stringname){
- returnnewUser(name);
- }
- });
- Useruser=map.get("tianshouzhi");//第一次獲取時會建立
- assertuser!=null;
- assertuser==map.get("tianshouzhi");//之後獲取,總是返回之前已經建立的物件
這段程式碼中,我們利用MigrateMap.makeComputingMap建立了一個Map,其中key為String型別,value為User型別。當我們呼叫map.get("tianshouzhi")方法,最開始這個Map中並沒有任何key/value的,於是其就會回撥Function的apply方法,利用引數"tianshouzhi"建立一個User物件並返回。之後當我們再以"tianshouzhi"為key從Map中獲取User物件時,會直接將前面建立的物件返回。不會回撥apply方法,也就是說,只有在第一次嘗試獲取時,才會回撥apply方法。
而在上述程式碼中,實際上就利用了這個特性,只不過是根據destination獲取ServerRunningMonitor物件,如果不存在就建立。
在建立ServerRunningMonitor物件時,首先根據ServerRunningData建立ServerRunningMonitor例項,之後設定了destination和ServerRunningListener
物件,接著,判斷如果zkClientx欄位如果不為空,也設定到ServerRunningMonitor中,最後呼叫init方法進行初始化。
- ServerRunningMonitorrunningMonitor=newServerRunningMonitor(serverData);
- runningMonitor.setDestination(destination);
- runningMonitor.setListener(newServerRunningListener(){...})//省略ServerRunningListener具體程式碼
- if(zkclientx!=null){
- runningMonitor.setZkClient(zkclientx);
- }
- //觸發建立一下cid節點
- runningMonitor.init();
- returnrunningMonitor;
ServerRunningListener的實現如下:
- newServerRunningListener(){
- /*內部呼叫了embededCanalServer的start(destination)方法。
- 此處需要劃重點,說明每個destination對應的CanalInstance是通過embededCanalServer的start方法啟動的,
- 這與我們之前分析將instanceGenerator設定到embededCanalServer中可以對應上。
- embededCanalServer負責呼叫instanceGenerator生成CanalInstance例項,並負責其啟動。*/
- publicvoidprocessActiveEnter(){
- try{
- MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
- embededCanalServer.start(destination);
- }finally{
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- //內部呼叫embededCanalServer的stop(destination)方法。與上start方法類似,只不過是停止CanalInstance。
- publicvoidprocessActiveExit(){
- try{
- MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
- embededCanalServer.stop(destination);
- }finally{
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- /*處理存在zk的情況下,在Canalinstance啟動之前,在zk中建立節點。
- 路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
- 此方法會在processActiveEnter()之前被呼叫*/
- publicvoidprocessStart(){
- try{
- if(zkclientx!=null){
- finalStringpath=ZookeeperPathUtils.getDestinationClusterNode(destination,ip+":"+port);
- initCid(path);
- zkclientx.subscribeStateChanges(newIZkStateListener(){
- publicvoidhandleStateChanged(KeeperStatestate)throwsException{
- }
- publicvoidhandleNewSession()throwsException{
- initCid(path);
- }
- });
- }
- }finally{
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- //處理存在zk的情況下,在Canalinstance停止前,釋放zk節點,路徑為/otter/canal/destinations/{0}/cluster/{1},
- //其0會被destination替換,1會被ip:port替換。此方法會在processActiveExit()之前被呼叫
- publicvoidprocessStop(){
- try{
- MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
- if(zkclientx!=null){
- finalStringpath=ZookeeperPathUtils.getDestinationClusterNode(destination,ip+":"+port);
- releaseCid(path);
- }
- }finally{
- MDC.remove(CanalConstants.MDC_DESTINATION);
- }
- }
- }
上述程式碼中,我們可以看到啟動一個CanalInstance實際上是在ServerRunningListener的processActiveEnter方法中,通過呼叫embededCanalServer的start(destination)方法進行的,對於停止也是類似。
那麼ServerRunningListener中的相關方法到底是在哪裡回撥的呢?我們可以在ServerRunningMonitor的start和stop方法中找到答案,這裡只列出start方法。
- publicclassServerRunningMonitorextendsAbstractCanalLifeCycle{
- ...
- publicvoidstart(){
- super.start();
- processStart();//其內部會呼叫ServerRunningListener的processStart()方法
- if(zkClient!=null){//存在zk,以HA方式啟動
- //如果需要儘可能釋放instance資源,不需要監聽running節點,不然即使stop了這臺機器,另一臺機器立馬會start
- Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
- zkClient.subscribeDataChanges(path,dataListener);
- initRunning();
- }else{//沒有zk,直接啟動
- processActiveEnter();
- }
- }
- //...stop方法邏輯類似,相關程式碼省略
- }
當ServerRunningMonitor的start方法被呼叫時,其首先會直接呼叫processStart方法,這個方法內部直接調了ServerRunningListener的processStart()方法,原始碼如下所示。通過前面的分析,我們已經知道在存在zkClient!=null的情況,會往zk中建立一個節點。
- privatevoidprocessStart(){
- if(listener!=null){
- try{
- listener.processStart();
- }catch(Exceptione){
- logger.error("processStartfailed",e);
- }
- }
- }
之後會判斷是否存在zkClient,如果不存在,則以本地方式啟動,如果存在,則以HA方式啟動。我們知道,canal server可以部署成兩種方式:叢集方式或者獨立部署。其中叢集方式是利用zk來做HA,獨立部署則可以直接進行啟動。我們先來看比較簡單的直接啟動。
直接啟動:
不存在zk的情況下,會進入else程式碼塊,呼叫processActiveEnter方法,其內部呼叫了listener的processActiveEnter,啟動相應destination對應的CanalInstance。
- privatevoidprocessActiveEnter(){
- if(listener!=null){
- try{
- listener.processActiveEnter();
- }catch(Exceptione){
- logger.error("processActiveEnterfailed",e);
- }
- }
- }
HA方式啟動:
存在zk,說明canal server可能做了叢集,因為canal就是利用zk來做HA的。首先根據destination構造一個zk的節點路徑,然後進行監聽。
- /*構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中佔位符{0}會被destination替換。
- 在叢集模式下,可能會有多個canalserver共同處理同一個destination,
- 在某一時刻,只能由一個canalserver進行處理,處理這個destination的canalserver進入running狀態,其他canalserver進入standby狀態。*/
- Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
- /*對destination對應的running節點進行監聽,一旦發生了變化,則說明可能其他處理相同destination的canalserver可能出現了異常,
- 此時需要嘗試自己進入running狀態。*/
- zkClient.subscribeDataChanges(path,dataListener);
上述只是監聽程式碼,之後嘗試呼叫initRunning方法通過HA的方式來啟動CanalInstance。
- privatevoidinitRunning(){
- if(!isStart()){
- return;
- }
- //構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中佔位符{0}會被destination替換
- Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
- //序列化
- //構建臨時節點的資料,標記當前destination由哪一個canalserver處理
- byte[]bytes=JsonUtils.marshalToByte(serverData);
- try{
- mutex.set(false);
- //嘗試建立臨時節點。如果節點已經存在,說明是其他的canalserver已經啟動了這個canalinstance。
- //此時會丟擲ZkNodeExistsException,進入catch程式碼塊。
- zkClient.create(path,bytes,CreateMode.EPHEMERAL);
- activeData=serverData;
- processActiveEnter();//如果建立成功,觸發一下事件,內部呼叫ServerRunningListener的processActiveEnter方法
- mutex.set(true);
- }catch(ZkNodeExistsExceptione){
- //建立節點失敗,則根據path從zk中獲取當前是哪一個canalserver建立了當前canalinstance的相關資訊。
- //第二個引數true,表示的是,如果這個path不存在,則返回null。
- bytes=zkClient.readData(path,true);
- if(bytes==null){//如果不存在節點,立即嘗試一次
- initRunning();
- }else{
- //如果的確存在,則將建立該canalinstance例項資訊存入activeData中。
- activeData=JsonUtils.unmarshalFromByte(bytes,ServerRunningData.class);
- }
- }catch(ZkNoNodeExceptione){//如果/otter/canal/destinations/{0}/節點不存在,進行建立其中佔位符{0}會被destination替換
- zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination),true);
- //嘗試建立父節點
- initRunning();
- }
- }
可以看到,initRunning方法內部只有在嘗試在zk中建立節點成功後,才會去呼叫listener的processActiveEnter方法來真正啟動destination對應的canal instance,這是canal HA方式啟動的核心。canal官方文件中介紹了CanalServer
HA機制啟動的流程,如下:
事實上,這個說明的前兩步,都是在initRunning方法中實現的。從上面的程式碼中,我們可以看出,在HA機啟動的情況下,initRunning方法不一定能走到processActiveEnter()方法,因為建立臨時節點可能會出錯。
此外,根據官方文件說明,如果出錯,那麼當前canal instance則進入standBy狀態。也就是另外一個canal instance出現異常時,當前canal instance頂上去。那麼相關原始碼在什麼地方呢?在HA方式啟動最開始的2行程式碼的監聽邏輯中:
- Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
- zkClient.subscribeDataChanges(path,dataListener);
其中dataListener型別是IZkDataListener
,這是zkclient客戶端提供的介面,定義如下:
- publicinterfaceIZkDataListener{
- publicvoidhandleDataChange(StringdataPath,Objectdata)throwsException;
- publicvoidhandleDataDeleted(StringdataPath)throwsException;
- }
當zk節點中的資料發生變更時,會自動回撥這兩個方法,很明顯,一個是用於處理節點資料發生變化,一個是用於處理節點資料被刪除。
而dataListener是在ServerRunningMonitor的構造方法中初始化的,如下:
- publicServerRunningMonitor(){
- //建立父節點
- dataListener=newIZkDataListener(){
- //!!!目前看來,好像並沒有存在修改running節點資料的程式碼,為什麼這個方法不是空實現?
- publicvoidhandleDataChange(StringdataPath,Objectdata)throwsException{
- MDC.put("destination",destination);
- ServerRunningDatarunningData=JsonUtils.unmarshalFromByte((byte[])data,ServerRunningData.class);
- if(!isMine(runningData.getAddress())){
- mutex.set(false);
- }
- if(!runningData.isActive()&&isMine(runningData.getAddress())){//說明出現了主動釋放的操作,並且本機之前是active
- release=true;
- releaseRunning();//徹底釋放mainstem}
- activeData=(ServerRunningData)runningData;
- }
- //當其他canalinstance出現異常,臨時節點資料被刪除時,會自動回撥這個方法,此時當前canalinstance要頂上去
- publicvoidhandleDataDeleted(StringdataPath)throwsException{
- MDC.put("destination",destination);
- mutex.set(false);
- if(!release&&activeData!=null&&isMine(activeData.getAddress())){
- //如果上一次active的狀態就是本機,則即時觸發一下active搶佔
- initRunning();
- }else{
- //否則就是等待delayTime,避免因網路瞬端或者zk異常,導致出現頻繁的切換操作
- delayExector.schedule(newRunnable(){
- publicvoidrun(){
- initRunning();//嘗試自己進入running狀態
- }
- },delayTime,TimeUnit.SECONDS);
- }
- }
- };
- }
那麼現在問題來了?ServerRunningMonitor的start方法又是在哪裡被呼叫的, 這個方法被呼叫了,才能真正的啟動canal instance。這部分程式碼我們放到後面的CanalController中的start方法進行講解。
下面分析最後一部分程式碼,autoScan機制相關程式碼。
3.5 autoScan機制相關程式碼
關於autoscan,官方文件有以下介紹:
結合autoscan機制的相關原始碼:
- //
- autoScan=BooleanUtils.toBoolean(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN));
- if(autoScan){
- defaultAction=newInstanceAction(){//....};
- instanceConfigMonitors=//....
- }
可以看到,autoScan是否需要自動掃描的開關,只有當autoScan為true時,才會初始化defaultAction欄位和instanceConfigMonitors欄位。其中:
其中:
defaultAction:其作用是如果配置發生了變更,預設應該採取什麼樣的操作。其實現了InstanceAction
介面定義的三個抽象方法:start、stop和reload。當新增一個destination配置時,需要呼叫start方法來啟動;當移除一個destination配置時,需要呼叫stop方法來停止;當某個destination配置發生變更時,需要呼叫reload方法來進行重啟。
instanceConfigMonitors:型別為Map<InstanceMode, InstanceConfigMonitor>。defaultAction欄位只是定義了配置發生變化預設應該採取的操作,那麼總該有一個類來監聽配置是否發生了變化,這就是InstanceConfigMonitor的作用。官方文件中,只提到了對canal.conf.dir配置項指定的目錄的監聽,這指的是通過spring方式載入配置。顯然的,通過manager方式載入配置,配置中心的內容也是可能發生變化的,也需要進行監聽。此時可以理解為什麼instanceConfigMonitors的型別是一個Map,key為InstanceMode,就是為了對這兩種方式的配置載入方式都進行監聽。
defaultAction欄位初始化原始碼如下所示:
- defaultAction=newInstanceAction(){
- publicvoidstart(Stringdestination){
- InstanceConfigconfig=instanceConfigs.get(destination);
- if(config==null){
- //重新讀取一下instanceconfig
- config=parseInstanceConfig(properties,destination);
- instanceConfigs.put(destination,config);
- }
- if(!embededCanalServer.isStart(destination)){
- //HA機制啟動
- ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
- if(!config.getLazy()&&!runningMonitor.isStart()){
- runningMonitor.start();
- }
- }
- }
- publicvoidstop(Stringdestination){
- //此處的stop,代表強制退出,非HA機制,所以需要退出HA的monitor和配置資訊
- InstanceConfigconfig=instanceConfigs.remove(destination);
- if(config!=null){
- embededCanalServer.stop(destination);
- ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
- if(runningMonitor.isStart()){
- runningMonitor.stop();
- }
- }
- }
- publicvoidreload(Stringdestination){
- //目前任何配置變化,直接重啟,簡單處理
- stop(destination);
- start(destination);
- }
- };
instanceConfigMonitors欄位初始化原始碼如下所示:
- instanceConfigMonitors=MigrateMap.makeComputingMap(newFunction<InstanceMode,InstanceConfigMonitor>(){
- publicInstanceConfigMonitorapply(InstanceModemode){
- intscanInterval=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
- if(mode.isSpring()){//如果載入方式是spring,返回SpringInstanceConfigMonitor
- SpringInstanceConfigMonitormonitor=newSpringInstanceConfigMonitor();
- monitor.setScanIntervalInSecond(scanInterval);
- monitor.setDefaultAction(defaultAction);
- //設定conf目錄,預設是user.dir+conf目錄組成
- StringrootDir=getProperty(properties,CanalConstants.CANAL_CONF_DIR);
- if(StringUtils.isEmpty(rootDir)){
- rootDir="../conf";
- }
- if(StringUtils.equals("otter-canal",System.getProperty("appName"))){
- monitor.setRootConf(rootDir);
- }else{
- //eclipsedebug模式
- monitor.setRootConf("src/main/resources/");
- }
- returnmonitor;
- }elseif(mode.isManager()){//如果載入方式是manager,返回ManagerInstanceConfigMonitor
- returnnewManagerInstanceConfigMonitor();
- }else{
- thrownewUnsupportedOperationException("unknowmode:"+mode+"formonitor");
- }
- }
- });
可以看到instanceConfigMonitors也是根據mode屬性,來採取不同的監控實現類SpringInstanceConfigMonitor
或者ManagerInstanceConfigMonitor
,二者都實現了InstanceConfigMonitor
介面。
- publicinterfaceInstanceConfigMonitorextendsCanalLifeCycle{
- voidregister(Stringdestination,InstanceActionaction);
- voidunregister(Stringdestination);
- }
當需要對一個destination進行監聽時,呼叫register方法
當取消對一個destination監聽時,呼叫unregister方法。
事實上,unregister方法在canal 內部並沒有有任何地方被呼叫,也就是說,某個destination如果開啟了autoScan=true,那麼你是無法在執行時停止對其進行監控的。如果要停止,你可以選擇將對應的目錄刪除。
InstanceConfigMonitor本身並不知道哪些canal instance需要進行監控,因為不同的canal instance,有的可能設定autoScan為true,另外一些可能設定為false。
在CanalConroller的start方法中,對於autoScan為true的destination,會呼叫InstanceConfigMonitor的register方法進行註冊,此時InstanceConfigMonitor才會真正的對這個destination配置進行掃描監聽。對於那些autoScan為false的destination,則不會進行監聽。
目前SpringInstanceConfigMonitor對這兩個方法都進行了實現,而ManagerInstanceConfigMonitor目前對這兩個方法實現的都是空,需要開發者自己來實現。
在實現ManagerInstanceConfigMonitor時,可以參考SpringInstanceConfigMonitor。
此處不打算再繼續進行分析SpringInstanceConfigMonitor的原始碼,因為邏輯很簡單,感興趣的讀者可以自行檢視SpringInstanceConfigMonitor 的scan方法,內部在什麼情況下會回撥defualtAction的start、stop、reload方法 。
4 CanalController的start方法
而ServerRunningMonitor的start方法,是在CanalController中的start方法中被呼叫的,CanalController中的start方法是在CanalLauncher中被呼叫的。
com.alibaba.otter.canal.deployer.CanalController#start
- publicvoidstart()throwsThrowable{
- logger.info("##startthecanalserver[{}:{}]",ip,port);
- //建立整個canal的工作節點:/otter/canal/cluster/{0}
- finalStringpath=ZookeeperPathUtils.getCanalClusterNode(ip+":"+port);
- initCid(path);
- if(zkclientx!=null){
- this.zkclientx.subscribeStateChanges(newIZkStateListener(){
- publicvoidhandleStateChanged(KeeperStatestate)throwsException{
- }
- publicvoidhandleNewSession()throwsException{
- initCid(path);
- }
- });
- }
- //優先啟動embeded服務
- embededCanalServer.start();
- //啟動不是lazy模式的CanalInstance,通過迭代instanceConfigs,根據destination獲取對應的ServerRunningMonitor,然後逐一啟動
- for(Map.Entry<String,InstanceConfig>entry:instanceConfigs.entrySet()){
- finalStringdestination=entry.getKey();
- InstanceConfigconfig=entry.getValue();
- //如果destination對應的CanalInstance沒有啟動,則進行啟動
- if(!embededCanalServer.isStart(destination)){
- ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
- //如果不是lazy,lazy模式需要等到第一次有客戶端請求才會啟動
- if(!config.getLazy()&&!runningMonitor.isStart()){
- runningMonitor.start();
- }
- }
- if(autoScan){
- instanceConfigMonitors.get(config.getMode()).register(destination,defaultAction);
- }
- }
- if(autoScan){//啟動配置檔案自動檢測機制
- instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
- for(InstanceConfigMonitormonitor:instanceConfigMonitors.values()){
- if(!monitor.isStart()){
- monitor.start();//啟動monitor
- }
- }
- }
- //啟動網路介面,監聽客戶端請求
- canalServer.start();
- }
5 總結
deployer模組的主要作用:
1、讀取canal.properties,確定canal instance的配置載入方式
2、確定canal instance的啟動方式:獨立啟動或者叢集方式啟動
3、監聽canal instance的配置的變化,動態停止、啟動或新增
4、啟動canal server,監聽客戶端請求