1. 程式人生 > 其它 >canal原始碼分析簡介-1

canal原始碼分析簡介-1

1.0 canal原始碼分析簡介

canal是阿里巴巴開源的mysql資料庫binlog的增量訂閱&消費元件。專案github地址為:https://github.com/alibaba/canal

本教程是從原始碼的角度來分析canal,適用於對canal有一定基礎的同學。本教程使用的版本是1.0.24,這也是筆者寫這篇教程時的最新穩定版,關於canal的基礎知識可以參考:https://github.com/alibaba/canal/wiki

下載專案原始碼

下載

  1. gitclonehttps://github.com/alibaba/canal.git

切換到canal-1.0.24這個tag

  1. gitcheckoutcanal-1.0.24

原始碼模組劃分

canal是基於maven構建的,總共分成了14個模組,如下所示:

模組雖多,但是每個模組的程式碼都很少。各個模組的作用如下所示:

common模組:主要是提供了一些公共的工具類和介面。

client模組:canal的客戶端。核心介面為CanalConnector

example模組:提供client模組使用案例。

protocol模組:client和server模組之間的通訊協議

deployer:部署模組。通過該模組提供的CanalLauncher來啟動canal server

server模組:canal伺服器端。核心介面為CanalServer

instance模組:一個server有多個instance。每個instance都會模擬成一個mysql例項的slave。instance模組有四個核心組成部分:parser模組、sink模組、store模組,meta模組。核心介面為CanalInstance

parser模組:資料來源接入,模擬slave協議和master進行互動,協議解析。parser模組依賴於dbsync、driver模組。

driver模組和dbsync模組:從這兩個模組的artifactId(canal.parse.driver、canal.parse.dbsync),就可以看出來,這兩個模組實際上是parser模組的元件。事實上parser 是通過driver模組與mysql建立連線,從而獲取到binlog。由於原始的binlog都是二進位制流,需要解析成對應的binlog事件,這些binlog事件物件都定義在dbsync模組中,dbsync 模組來自於淘寶的tddl。

sink模組:parser和store連結器,進行資料過濾,加工,分發的工作。核心介面為CanalEventSink

store模組:資料儲存。核心介面為CanalEventStore

meta模組:增量訂閱&消費資訊管理器,核心介面為CanalMetaManager,主要用於記錄canal消費到的mysql binlog的位置,

下面再通過一張圖來說明各個模組之間的依賴關係

通過deployer模組,啟動一個canal-server,一個cannal-server內部包含多個instance,每個instance都會偽裝成一個mysql例項的slave。client與server之間的通訊協議由protocol模組定義。client在訂閱binlog資訊時,需要傳遞一個destination引數,server會根據這個destination確定由哪一個instance為其提供服務。

在分析原始碼的時候,本人也是按照模組來劃分的,基本上一個模組對應一篇文章。

2.0 deployer模組

canal有兩種使用方式:1、獨立部署 2、內嵌到應用中。 deployer模組主要用於獨立部署canal server。關於這兩種方式的區別,請參見server模組原始碼分析。deployer模組原始碼目錄結構如下所示:

在獨立部署canal時,需要首先對canal的原始碼進行打包

  1. mvncleaninstall-Dmaven.test.skip-Denv=release

以本教程使用1.0.24版本為例,打包後會在target目錄生成一個以下兩個檔案:

其中canal.deployer-1.0.24.tar.gz就是canal的獨立部署包。解壓縮後,目錄如下所示。其中bin目錄和conf目錄(包括子目錄spring)中的所有檔案,都來自於deployer模組。

  1. canal
  2. ├──bin
  3. ├──startup.bat
  4. ├──startup.sh
  5. └──stop.sh
  6. ├──conf
  7. ├──canal.properties
  8. ├──example
  9. └──instance.properties
  10. ├──logback.xml
  11. └──spring
  12. ├──default-instance.xml
  13. ├──file-instance.xml
  14. ├──group-instance.xml
  15. ├──local-instance.xml
  16. └──memory-instance.xml
  17. ├──lib
  18. └──....依賴的各種jar
  19. └──logs

deployer模組主要完成以下功能:

1、讀取canal,properties配置檔案

2、啟動canal server,監聽canal client的請求

3、啟動canal instance,連線mysql資料庫,偽裝成slave,解析binlog

4、在canal的執行過程中,監聽配置檔案的變化

1、啟動和停止指令碼

bin目錄中包含了canal的啟動和停止指令碼startup.shstop.sh,當我們要啟動canal時,只需要輸入以下命令即可

  1. shbin/startup.sh

在windows環境下,可以直接雙擊startup.bat。

在startup.sh指令碼內,會呼叫com.alibaba.otter.canal.deployer.CanalLauncher類來進行啟動,這是分析Canal原始碼的入口類,如下圖所示:

同時,startup.sh還會在bin目錄下生成一個canal.pid檔案,用於儲存canal的程序id。當停止canal的時候

  1. shbin/stop.sh

會根據canal.pid檔案中記錄的程序id,kill掉canal程序,並且刪除這個檔案。

2、CannalLauncher

CanalLauncher是整個原始碼分析的入口類,程式碼相當簡單。步驟是:

1、讀取canal.properties檔案中的配置

2、利用讀取的配置構造一個CanalController例項,將所有的啟動操作都委派給CanalController進行處理。

3、最後註冊一個鉤子函式,在JVM停止時同時也停止canal server。

com.alibaba.otter.canal.deployer.CanalLauncher

  1. publicclassCanalLauncher{
  2. privatestaticfinalStringCLASSPATH_URL_PREFIX="classpath:";
  3. privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CanalLauncher.class);
  4. publicstaticvoidmain(String[]args)throwsThrowable{
  5. try{
  6. //1、讀取canal.properties檔案中配置,預設讀取classpath下的canal.properties
  7. Stringconf=System.getProperty("canal.conf","classpath:canal.properties");
  8. Propertiesproperties=newProperties();
  9. if(conf.startsWith(CLASSPATH_URL_PREFIX)){
  10. conf=StringUtils.substringAfter(conf,CLASSPATH_URL_PREFIX);
  11. properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
  12. }else{
  13. properties.load(newFileInputStream(conf));
  14. }
  15. //2、啟動canal,首先將properties物件傳遞給CanalController,然後呼叫其start方法啟動
  16. logger.info("##startthecanalserver.");
  17. finalCanalControllercontroller=newCanalController(properties);
  18. controller.start();
  19. logger.info("##thecanalserverisrunningnow......");
  20. //3、關閉canal,通過新增JVM的鉤子,JVM停止前會回撥run方法,其內部呼叫controller.stop()方法進行停止
  21. Runtime.getRuntime().addShutdownHook(newThread(){
  22. publicvoidrun(){
  23. try{
  24. logger.info("##stopthecanalserver");
  25. controller.stop();
  26. }catch(Throwablee){
  27. logger.warn("##somethinggoeswrongwhenstoppingcanalServer:\n{}",
  28. ExceptionUtils.getFullStackTrace(e));
  29. }finally{
  30. logger.info("##canalserverisdown.");
  31. }
  32. }
  33. });
  34. }catch(Throwablee){
  35. logger.error("##SomethinggoeswrongwhenstartingupthecanalServer:\n{}",
  36. ExceptionUtils.getFullStackTrace(e));
  37. System.exit(0);
  38. }
  39. }
  40. }

可以看到,CanalLauncher實際上只是負責讀取canal.properties配置檔案,然後構造CanalController物件,並通過其start和stop方法來開啟和停止canal。因此,如果說CanalLauncher是canal原始碼分析的入口類,那麼CanalController就是canal原始碼分析的核心類。

3、CanalController

在CanalController的構造方法中,會對配置檔案內容解析,初始化相關成員變數,做好canal server的啟動前的準備工作,之後在CanalLauncher中呼叫CanalController.start方法來啟動。

CanalController中定義的相關欄位和構造方法,如下所示:

  1. publicclassCanalController{
  2. privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CanalController.class);
  3. privateLongcid;
  4. privateStringip;
  5. privateintport;
  6. //預設使用spring的方式載入
  7. privateMap<String,InstanceConfig>instanceConfigs;
  8. privateInstanceConfigglobalInstanceConfig;
  9. privateMap<String,CanalConfigClient>managerClients;
  10. //監聽instanceconfig的變化
  11. privatebooleanautoScan=true;
  12. privateInstanceActiondefaultAction;
  13. privateMap<InstanceMode,InstanceConfigMonitor>instanceConfigMonitors;
  14. privateCanalServerWithEmbeddedembededCanalServer;
  15. privateCanalServerWithNettycanalServer;
  16. privateCanalInstanceGeneratorinstanceGenerator;
  17. privateZkClientxzkclientx;
  18. publicCanalController(){
  19. this(System.getProperties());
  20. }
  21. publicCanalController(finalPropertiesproperties){
  22. managerClients=MigrateMap.makeComputingMap(newFunction<String,CanalConfigClient>(){
  23. publicCanalConfigClientapply(StringmanagerAddress){
  24. returngetManagerClient(managerAddress);
  25. }
  26. });
  27. //1、配置解析
  28. globalInstanceConfig=initGlobalConfig(properties);
  29. instanceConfigs=newMapMaker().makeMap();
  30. initInstanceConfig(properties);
  31. //2、準備canalserver
  32. cid=Long.valueOf(getProperty(properties,CanalConstants.CANAL_ID));
  33. ip=getProperty(properties,CanalConstants.CANAL_IP);
  34. port=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_PORT));
  35. embededCanalServer=CanalServerWithEmbedded.instance();
  36. embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//設定自定義的instanceGenerator
  37. canalServer=CanalServerWithNetty.instance();
  38. canalServer.setIp(ip);
  39. canalServer.setPort(port);
  40. //3、初始化zk相關程式碼
  41. //處理下ip為空,預設使用hostIp暴露到zk中
  42. if(StringUtils.isEmpty(ip)){
  43. ip=AddressUtils.getHostIp();
  44. }
  45. finalStringzkServers=getProperty(properties,CanalConstants.CANAL_ZKSERVERS);
  46. if(StringUtils.isNotEmpty(zkServers)){
  47. zkclientx=ZkClientx.getZkClient(zkServers);
  48. //初始化系統目錄
  49. zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE,true);
  50. zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE,true);
  51. }
  52. //4CanalInstance執行狀態監控
  53. finalServerRunningDataserverData=newServerRunningData(cid,ip+":"+port);
  54. ServerRunningMonitors.setServerData(serverData);
  55. ServerRunningMonitors.setRunningMonitors(//...);
  56. //5、autoScan機制相關程式碼
  57. autoScan=BooleanUtils.toBoolean(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN));
  58. if(autoScan){
  59. defaultAction=newInstanceAction(){//....};
  60. instanceConfigMonitors=//....
  61. }
  62. }
  63. ....
  64. }

為了讀者能夠儘量容易的看出CanalController的構造方法中都做了什麼,上面程式碼片段中省略了部分程式碼。這樣,我們可以很明顯的看出來, ,在CanalController構造方法中的程式碼分劃分為了固定的幾個處理步驟,下面按照幾個步驟的劃分,逐一進行講解,並詳細的介紹CanalController中定義的各個欄位的作用。

3.1 配置解析相關程式碼

  1. //初始化全域性引數設定
  2. globalInstanceConfig=initGlobalConfig(properties);
  3. instanceConfigs=newMapMaker().makeMap();
  4. //初始化instanceconfig
  5. initInstanceConfig(properties);

3.1.1 globalInstanceConfig欄位

表示canal instance的全域性配置,型別為InstanceConfig,通過initGlobalConfig方法進行初始化。主要用於解析canal.properties以下幾個配置項:

  • canal.instance.global.mode:確定canal instance配置載入方式,取值有manager|spring兩種方式

  • canal.instance.global.lazy:確定canal instance是否延遲初始化

  • canal.instance.global.manager.address:配置中心地址。如果canal.instance.global.mode=manager,需要提供此配置項

  • canal.instance.global.spring.xml:spring配置檔案路徑。如果canal.instance.global.mode=spring,需要提供此配置項

initGlobalConfig原始碼如下所示:

  1. privateInstanceConfiginitGlobalConfig(Propertiesproperties){
  2. InstanceConfigglobalConfig=newInstanceConfig();
  3. //讀取canal.instance.global.mode
  4. StringmodeStr=getProperty(properties,CanalConstants.getInstanceModeKey(CanalConstants.GLOBAL_NAME));
  5. if(StringUtils.isNotEmpty(modeStr)){
  6. //將modelStr轉成列舉InstanceMode,這是一個列舉類,只有2個取值,SPRING\MANAGER,對應兩種配置方式
  7. globalConfig.setMode(InstanceMode.valueOf(StringUtils.upperCase(modeStr)));
  8. }
  9. //讀取canal.instance.global.lazy
  10. StringlazyStr=getProperty(properties,CanalConstants.getInstancLazyKey(CanalConstants.GLOBAL_NAME));
  11. if(StringUtils.isNotEmpty(lazyStr)){
  12. globalConfig.setLazy(Boolean.valueOf(lazyStr));
  13. }
  14. //讀取canal.instance.global.manager.address
  15. StringmanagerAddress=getProperty(properties,
  16. CanalConstants.getInstanceManagerAddressKey(CanalConstants.GLOBAL_NAME));
  17. if(StringUtils.isNotEmpty(managerAddress)){
  18. globalConfig.setManagerAddress(managerAddress);
  19. }
  20. //讀取canal.instance.global.spring.xml
  21. StringspringXml=getProperty(properties,CanalConstants.getInstancSpringXmlKey(CanalConstants.GLOBAL_NAME));
  22. if(StringUtils.isNotEmpty(springXml)){
  23. globalConfig.setSpringXml(springXml);
  24. }
  25. instanceGenerator=//...初始化instanceGenerator
  26. returnglobalConfig;
  27. }

其中canal.instance.global.mode用於確定canal instance的全域性配置載入方式,其取值範圍有2個:springmanager。我們知道一個canal server中可以啟動多個canal instance,每個instance都有各自的配置。instance的配置也可以放在本地,也可以放在遠端配置中心裡。我們可以自定義每個canal instance配置檔案儲存的位置,如果所有canal instance的配置都在本地或者遠端,此時我們就可以通過canal.instance.global.mode這個配置項,來統一的指定配置檔案的位置,避免為每個canal instance單獨指定。

其中:

spring方式:

表示所有的canal instance的配置檔案位於本地。此時,我們必須提供配置項canal.instance.global.spring.xml指定spring配置檔案的路徑。canal提供了多個spring配置檔案:file-instance.xml、default-instance.xml、memory-instance.xml、local-instance.xml、group-instance.xml。這麼多配置檔案主要是為了支援canal instance不同的工作方式。我們在稍後將會講解各個配置檔案的區別。而在這些配置檔案的開頭,我們無一例外的可以看到以下配置:

  1. <beanclass="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer"lazy-init="false">
  2. <propertyname="ignoreResourceNotFound"value="true"/>
  3. <propertyname="systemPropertiesModeName"value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!--允許system覆蓋-->
  4. <propertyname="locationNames">
  5. <list>
  6. <value>classpath:canal.properties</value>
  7. <value>classpath:${canal.instance.destination:}/instance.properties</value>
  8. </list>
  9. </property>
  10. </bean>

這裡我們可以看到,所謂通過spring方式載入canal instance配置,無非就是通過spring提供的PropertyPlaceholderConfigurer來載入canal instance的配置檔案instance.properties。

這裡instance.properties的檔案完整路徑是${canal.instance.destination:}/instance.properties,其中${canal.instance.destination}是一個變數。這是因為我們可以在一個canal server中配置多個canal instance,每個canal instance配置檔案的名稱都是instance.properties,因此我們需要通過目錄進行區分。例如我們通過配置項canal.destinations指定多個canal instance的名字

  1. canal.destinations=example1,example2

此時我們就要conf目錄下,新建兩個子目錄example1和example2,每個目錄下各自放置一個instance.properties。

canal在初始化時就會分別使用example1和example2來替換${canal.instance.destination:},從而分別根據example1/instance.properties和example2/instance.properties建立2個canal instance。

manager方式:

表示所有的canal instance的配置檔案位於遠端配置中心,此時我們必須提供配置項 canal.instance.global.manager.address來指定遠端配置中心的地址。目前alibaba內部配置使用這種方式。開發者可以自己實現CanalConfigClient,連線各自的管理系統,完成接入。

3.1.2 instanceGenerator欄位

型別為CanalInstanceGenerator。在initGlobalConfig方法中,除了建立了globalInstanceConfig例項,同時還為欄位instanceGenerator欄位進行了賦值。

顧名思義,這個欄位用於建立CanalInstance例項。這是instance模組中的類,其作用就是為canal.properties檔案中canal.destinations配置項列出的每個destination,建立一個CanalInstance例項。CanalInstanceGenerator是一個介面,定義如下所示:

  1. publicinterfaceCanalInstanceGenerator{
  2. /**
  3. *通過destination產生特定的{@linkCanalInstance}
  4. */
  5. CanalInstancegenerate(Stringdestination);
  6. }

針對spring和manager兩種instance配置的載入方式,CanalInstanceGenerator提供了兩個對應的實現類,如下所示:

instanceGenerator欄位通過一個匿名內部類進行初始化。其內部會判斷配置的各個destination的配置載入方式,spring 或者manager。

  1. instanceGenerator=newCanalInstanceGenerator(){
  2. publicCanalInstancegenerate(Stringdestination){
  3. //1、根據destination從instanceConfigs獲取對應的InstanceConfig物件
  4. InstanceConfigconfig=instanceConfigs.get(destination);
  5. if(config==null){
  6. thrownewCanalServerException("can'tfinddestination:{}");
  7. }
  8. //2、如果destination對應的InstanceConfig的mode是manager方式,使用ManagerCanalInstanceGenerator
  9. if(config.getMode().isManager()){
  10. ManagerCanalInstanceGeneratorinstanceGenerator=newManagerCanalInstanceGenerator();
  11. instanceGenerator.setCanalConfigClient(managerClients.get(config.getManagerAddress()));
  12. returninstanceGenerator.generate(destination);
  13. }elseif(config.getMode().isSpring()){
  14. //3、如果destination對應的InstanceConfig的mode是spring方式,使用SpringCanalInstanceGenerator
  15. SpringCanalInstanceGeneratorinstanceGenerator=newSpringCanalInstanceGenerator();
  16. synchronized(this){
  17. try{
  18. //設定當前正在載入的通道,載入spring查詢檔案時會用到該變數
  19. System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY,destination);
  20. instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));
  21. returninstanceGenerator.generate(destination);
  22. }catch(Throwablee){
  23. logger.error("generatorinstancefailed.",e);
  24. thrownewCanalException(e);
  25. }finally{
  26. System.setProperty(CanalConstants.CANAL_DESTINATION_PROPERTY,"");
  27. }
  28. }
  29. }else{
  30. thrownewUnsupportedOperationException("unknowmode:"+config.getMode());
  31. }
  32. }
  33. };

上述程式碼中的第1步比較變態,從instanceConfigs中根據destination作為引數,獲得對應的InstanceConfig。而instanceConfigs目前還沒有被初始化,這個欄位是在稍後將後將要講解的initInstanceConfig方法初始化的,不過由於這是一個引用型別,當initInstanceConfig方法被執行後,instanceConfigs欄位中也就有值了。目前,我們姑且認為, instanceConfigs這個Map<String, InstanceConfig>型別的欄位已經被初始化好了。

2、3兩步用於確定是instance的配置載入方式是spring還是manager,如果是spring,就使用SpringCanalInstanceGenerator建立CanalInstance例項,如果是manager,就使用ManagerCanalInstanceGenerator建立CanalInstance例項。

由於目前manager方式的原始碼並未開源,因此,我們只分析SpringCanalInstanceGenerator相關程式碼。

上述程式碼中,首先建立了一個SpringCanalInstanceGenerator例項,然後往裡面設定了一個BeanFactory。

  1. instanceGenerator.setBeanFactory(getBeanFactory(config.getSpringXml()));

其中config.getSpringXml()返回的就是我們在canal.properties中通過canal.instance.global.spring.xml配置項指定了spring配置檔案路徑。getBeanFactory方法原始碼如下所示:

  1. privateBeanFactorygetBeanFactory(StringspringXml){
  2. ApplicationContextapplicationContext=newClassPathXmlApplicationContext(springXml);
  3. returnapplicationContext;
  4. }

SpringCanalInstanceGenerator設定了BeanFactory之後,就可以通過其的generate方法獲得CanalInstance例項。

SpringCanalInstanceGenerator的原始碼如下所示:

  1. publicclassSpringCanalInstanceGeneratorimplementsCanalInstanceGenerator,BeanFactoryAware{
  2. privateStringdefaultName="instance";
  3. privateBeanFactorybeanFactory;
  4. publicCanalInstancegenerate(Stringdestination){
  5. StringbeanName=destination;
  6. //首先判斷beanFactory是否包含以destination為id的bean
  7. if(!beanFactory.containsBean(beanName)){
  8. beanName=defaultName;//如果沒有,設定要獲取的bean的id為instance。
  9. }
  10. //以預設的bean的id值"instance"來獲取CanalInstance例項
  11. return(CanalInstance)beanFactory.getBean(beanName);
  12. }
  13. publicvoidsetBeanFactory(BeanFactorybeanFactory)throwsBeansException{
  14. this.beanFactory=beanFactory;
  15. }
  16. }

首先嚐試以傳入的引數destination來獲取CanalInstance例項,如果沒有,就以預設的bean的id值"instance"來獲取CanalInstance例項。事實上,如果你沒有修改spring配置檔案,那麼預設的名字就是instance。事實上,在canal提供的各個spring配置檔案xxx-instance.xml中,都有類似以下配置:

  1. <beanid="instance"class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  2. <propertyname="destination"value="${canal.instance.destination}"/>
  3. <propertyname="eventParser">
  4. <reflocal="eventParser"/>
  5. </property>
  6. <propertyname="eventSink">
  7. <reflocal="eventSink"/>
  8. </property>
  9. <propertyname="eventStore">
  10. <reflocal="eventStore"/>
  11. </property>
  12. <propertyname="metaManager">
  13. <reflocal="metaManager"/>
  14. </property>
  15. <propertyname="alarmHandler">
  16. <reflocal="alarmHandler"/>
  17. </property>
  18. </bean>

上面的程式碼片段中,我們看到的確有一個bean的名字是instance,其型別是CanalInstanceWithSpring,這是CanalInstance介面的實現類。類似的,我們可以想到在manager配置方式下,獲取的CanalInstance實現類是CanalInstanceWithManager。事實上,你想的沒錯,CanalInstance的類圖繼承關係如下所示:

需要注意的是,到目前為止,我們只是建立好了CanalInstanceGenerator,而CanalInstance尚未建立。在CanalController的start方法被呼叫時,CanalInstance才會被真正的建立,相關原始碼將在稍後分析。

3.1.3 instanceConfigs欄位

型別為Map<String, InstanceConfig>。前面提到初始化instanceGenerator後,當其generate方法被呼叫時,會嘗試從instanceConfigs根據一個destination獲取對應的InstanceConfig,現在分析instanceConfigs的相關初始化程式碼。

我們知道globalInstanceConfig定義全域性的配置載入方式。如果需要把部分CanalInstance配置放於本地,另外一部分CanalIntance配置放於遠端配置中心,則只通過全域性方式配置,無法達到這個要求。雖然這種情況很少見,但是為了提供最大的靈活性,canal支援每個CanalIntance自己來定義自己的載入方式,來覆蓋預設的全域性配置載入方式。而每個destination對應的InstanceConfig配置就存放於instanceConfigs欄位中。

舉例來說:

  1. //當前server上部署的instance列表
  2. canal.destinations=instance1,instance2
  3. //instance配置全域性載入方式
  4. canal.instance.global.mode=spring
  5. canal.instance.global.lazy=false
  6. canal.instance.global.spring.xml=classpath:spring/file-instance.xml
  7. //instance1覆蓋全域性載入方式
  8. canal.instance.instance1.mode=manager
  9. canal.instance.instance1.manager.address=127.0.0.1:1099
  10. canal.instance.instance1.lazy=tue

這段配置中,設定了instance的全域性載入方式為spring,instance1覆蓋了全域性配置,使用manager方式載入配置。而instance2沒有覆蓋配置,因此預設使用spring載入方式。

instanceConfigs欄位通過initInstanceConfig方法進行初始化

  1. instanceConfigs=newMapMaker().makeMap();//這裡利用GoogleGuava框架的MapMaker建立Map例項並賦值給instanceConfigs
  2. //初始化instanceconfig
  3. initInstanceConfig(properties);

initInstanceConfig方法原始碼如下:

  1. privatevoidinitInstanceConfig(Propertiesproperties){
  2. //讀取配置項canal.destinations
  3. StringdestinationStr=getProperty(properties,CanalConstants.CANAL_DESTINATIONS);
  4. //以","分割canal.destinations,得到一個數組形式的destination
  5. String[]destinations=StringUtils.split(destinationStr,CanalConstants.CANAL_DESTINATION_SPLIT);
  6. for(Stringdestination:destinations){
  7. //為每一個destination生成一個InstanceConfig例項
  8. InstanceConfigconfig=parseInstanceConfig(properties,destination);
  9. //將destination對應的InstanceConfig放入instanceConfigs中
  10. InstanceConfigoldConfig=instanceConfigs.put(destination,config);
  11. if(oldConfig!=null){
  12. logger.warn("destination:{}oldconfig:{}hasreplacebynewconfig:{}",newObject[]{destination,
  13. oldConfig,config});
  14. }
  15. }
  16. }

上面程式碼片段中,首先解析canal.destinations配置項,可以理解一個destination就對應要初始化一個canal instance。針對每個destination會建立各自的InstanceConfig,最終都會放到instanceConfigs這個Map中。

各個destination對應的InstanceConfig都是通過parseInstanceConfig方法來解析

  1. privateInstanceConfigparseInstanceConfig(Propertiesproperties,Stringdestination){
  2. //每個destination對應的InstanceConfig都引用了全域性的globalInstanceConfig
  3. InstanceConfigconfig=newInstanceConfig(globalInstanceConfig);
  4. //...其他幾個配置項與獲取globalInstanceConfig類似,不再贅述,唯一注意的的是配置項的key部分中的global變成傳遞進來的destination
  5. returnconfig;
  6. }

此時我們可以看一下InstanceConfig類的原始碼:

  1. publicclassInstanceConfig{
  2. privateInstanceConfigglobalConfig;
  3. privateInstanceModemode;
  4. privateBooleanlazy;
  5. privateStringmanagerAddress;
  6. privateStringspringXml;
  7. publicInstanceConfig(){
  8. }
  9. publicInstanceConfig(InstanceConfigglobalConfig){
  10. this.globalConfig=globalConfig;
  11. }
  12. publicstaticenumInstanceMode{
  13. SPRING,MANAGER;
  14. publicbooleanisSpring(){
  15. returnthis==InstanceMode.SPRING;
  16. }
  17. publicbooleanisManager(){
  18. returnthis==InstanceMode.MANAGER;
  19. }
  20. }
  21. publicBooleangetLazy(){
  22. if(lazy==null&&globalConfig!=null){
  23. returnglobalConfig.getLazy();
  24. }else{
  25. returnlazy;
  26. }
  27. }
  28. publicvoidsetLazy(Booleanlazy){
  29. this.lazy=lazy;
  30. }
  31. publicInstanceModegetMode(){
  32. if(mode==null&&globalConfig!=null){
  33. returnglobalConfig.getMode();
  34. }else{
  35. returnmode;
  36. }
  37. }
  38. publicvoidsetMode(InstanceModemode){
  39. this.mode=mode;
  40. }
  41. publicStringgetManagerAddress(){
  42. if(managerAddress==null&&globalConfig!=null){
  43. returnglobalConfig.getManagerAddress();
  44. }else{
  45. returnmanagerAddress;
  46. }
  47. }
  48. publicvoidsetManagerAddress(StringmanagerAddress){
  49. this.managerAddress=managerAddress;
  50. }
  51. publicStringgetSpringXml(){
  52. if(springXml==null&&globalConfig!=null){
  53. returnglobalConfig.getSpringXml();
  54. }else{
  55. returnspringXml;
  56. }
  57. }
  58. publicvoidsetSpringXml(StringspringXml){
  59. this.springXml=springXml;
  60. }
  61. publicStringtoString(){
  62. returnToStringBuilder.reflectionToString(this,CanalToStringStyle.DEFAULT_STYLE);
  63. }
  64. }

可以看到,InstanceConfig類中維護了一個globalConfig欄位,其型別也是InstanceConfig。而其相關get方法在執行時,會按照以下邏輯進行判斷:如果沒有自身沒有這個配置,則返回全域性配置,如果有,則返回自身的配置。通過這種方式實現對全域性配置的覆蓋。

3.2 準備canal server相關程式碼

  1. cid=Long.valueOf(getProperty(properties,CanalConstants.CANAL_ID));
  2. ip=getProperty(properties,CanalConstants.CANAL_IP);
  3. port=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_PORT));
  4. embededCanalServer=CanalServerWithEmbedded.instance();
  5. embededCanalServer.setCanalInstanceGenerator(instanceGenerator);//設定自定義的instanceGenerator
  6. canalServer=CanalServerWithNetty.instance();
  7. canalServer.setIp(ip);
  8. canalServer.setPort(port);

上述程式碼中,首先解析了cid、ip、port欄位,其中:

cid:Long,對應canal.properties檔案中的canal.id,目前無實際用途

ip:String,對應canal.properties檔案中的canal.ip,canal server監聽的ip。

port:int,對應canal.properties檔案中的canal.port,canal server監聽的埠

之後分別為以下兩個欄位賦值:

embededCanalServer:型別為CanalServerWithEmbedded

canalServer:型別為CanalServerWithNetty

CanalServerWithEmbeddedCanalServerWithNetty都實現了CanalServer介面,且都實現了單例模式,通過靜態方法instance獲取例項。

關於這兩種型別的實現,canal官方文件有以下描述:

說白了,就是我們可以不必獨立部署canal server。在應用直接使用CanalServerWithEmbedded直連mysql資料庫。如果覺得自己的技術hold不住相關程式碼,就獨立部署一個canal server,使用canal提供的客戶端,連線canal server獲取binlog解析後資料。而CanalServerWithNetty是在CanalServerWithEmbedded的基礎上做的一層封裝,用於與客戶端通訊。

在獨立部署canal server時,Canal客戶端傳送的所有請求都交給CanalServerWithNetty處理解析,解析完成之後委派給了交給CanalServerWithEmbedded進行處理。因此CanalServerWithNetty就是一個馬甲而已。CanalServerWithEmbedded才是核心。

因此,在上述程式碼中,我們看到,用於生成CanalInstance例項的instanceGenerator被設定到了CanalServerWithEmbedded中,而ip和port被設定到CanalServerWithNetty中。

關於CanalServerWithNetty如何將客戶端的請求委派給CanalServerWithEmbedded進行處理,我們將在server模組原始碼分析中進行講解。

3.3 初始化zk相關程式碼

  1. //讀取canal.properties中的配置項canal.zkServers,如果沒有這個配置,則表示專案不使用zk
  2. finalStringzkServers=getProperty(properties,CanalConstants.CANAL_ZKSERVERS);
  3. if(StringUtils.isNotEmpty(zkServers)){
  4. //建立zk例項
  5. zkclientx=ZkClientx.getZkClient(zkServers);
  6. //初始化系統目錄
  7. //destination列表,路徑為/otter/canal/destinations
  8. zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE,true);
  9. //整個canalserver的叢集列表,路徑為/otter/canal/cluster
  10. zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE,true);
  11. }

canal支援利用了zk來完成HA機制、以及將當前消費到到的mysql的binlog位置記錄到zk中。ZkClientx是canal對ZkClient進行了一層簡單的封裝。

顯然,當我們沒有配置canal.zkServers,那麼zkclientx不會被初始化。

關於Canal如何利用ZK做HA,我們將在稍後的程式碼中進行分。而利用zk記錄binlog的消費進度,將在之後的章節進行分析。

3.4 CanalInstance執行狀態監控相關程式碼

由於這段程式碼比較長且噁心,這裡筆者暫時對部分程式碼進行省略,以便讀者看清楚整各脈絡

  1. finalServerRunningDataserverData=newServerRunningData(cid,ip+":"+port);
  2. ServerRunningMonitors.setServerData(serverData);
  3. ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(newFunction<String,ServerRunningMonitor>(){
  4. publicServerRunningMonitorapply(finalStringdestination){
  5. ServerRunningMonitorrunningMonitor=newServerRunningMonitor(serverData);
  6. runningMonitor.setDestination(destination);
  7. runningMonitor.setListener(newServerRunningListener(){....});//省略ServerRunningListener的具體實現
  8. if(zkclientx!=null){
  9. runningMonitor.setZkClient(zkclientx);
  10. }
  11. //觸發建立一下cid節點
  12. runningMonitor.init();
  13. returnrunningMonitor;
  14. }
  15. }));

上述程式碼中,ServerRunningMonitors是ServerRunningMonitor物件的容器,而ServerRunningMonitor用於監控CanalInstance。

canal會為每一個destination建立一個CanalInstance,每個CanalInstance都會由一個ServerRunningMonitor來進行監控。而ServerRunningMonitor統一由ServerRunningMonitors進行管理。

除了CanalInstance需要監控,CanalServer本身也需要監控。因此我們在程式碼一開始,就看到往ServerRunningMonitors設定了一個ServerRunningData物件,封裝了canal server監聽的ip和埠等資訊。

ServerRunningMonitors原始碼如下所示:

  1. publicclassServerRunningMonitors{
  2. privatestaticServerRunningDataserverData;
  3. privatestaticMaprunningMonitors;//<String,ServerRunningMonitor>
  4. publicstaticServerRunningDatagetServerData(){
  5. returnserverData;
  6. }
  7. publicstaticMap<String,ServerRunningMonitor>getRunningMonitors(){
  8. returnrunningMonitors;
  9. }
  10. publicstaticServerRunningMonitorgetRunningMonitor(Stringdestination){
  11. return(ServerRunningMonitor)runningMonitors.get(destination);
  12. }
  13. publicstaticvoidsetServerData(ServerRunningDataserverData){
  14. ServerRunningMonitors.serverData=serverData;
  15. }
  16. publicstaticvoidsetRunningMonitors(MaprunningMonitors){
  17. ServerRunningMonitors.runningMonitors=runningMonitors;
  18. }
  19. }

ServerRunningMonitors的setRunningMonitors方法接收的引數是一個Map,其中Map的key是destination,value是ServerRunningMonitor,也就是說針對每一個destination都有一個ServerRunningMonitor來監控。

上述程式碼中,在往ServerRunningMonitors設定Map時,是通過MigrateMap.makeComputingMap方法來建立的,其接受一個Function型別的引數,這是guava中定義的介面,其聲明瞭apply抽象方法。其工作原理可以通過下面程式碼片段進行介紹:

  1. Map<String,User>map=MigrateMap.makeComputingMap(newFunction<String,User>(){
  2. @Override
  3. publicUserapply(Stringname){
  4. returnnewUser(name);
  5. }
  6. });
  7. Useruser=map.get("tianshouzhi");//第一次獲取時會建立
  8. assertuser!=null;
  9. assertuser==map.get("tianshouzhi");//之後獲取,總是返回之前已經建立的物件

這段程式碼中,我們利用MigrateMap.makeComputingMap建立了一個Map,其中key為String型別,value為User型別。當我們呼叫map.get("tianshouzhi")方法,最開始這個Map中並沒有任何key/value的,於是其就會回撥Function的apply方法,利用引數"tianshouzhi"建立一個User物件並返回。之後當我們再以"tianshouzhi"為key從Map中獲取User物件時,會直接將前面建立的物件返回。不會回撥apply方法,也就是說,只有在第一次嘗試獲取時,才會回撥apply方法。

而在上述程式碼中,實際上就利用了這個特性,只不過是根據destination獲取ServerRunningMonitor物件,如果不存在就建立。

在建立ServerRunningMonitor物件時,首先根據ServerRunningData建立ServerRunningMonitor例項,之後設定了destination和ServerRunningListener物件,接著,判斷如果zkClientx欄位如果不為空,也設定到ServerRunningMonitor中,最後呼叫init方法進行初始化。

  1. ServerRunningMonitorrunningMonitor=newServerRunningMonitor(serverData);
  2. runningMonitor.setDestination(destination);
  3. runningMonitor.setListener(newServerRunningListener(){...})//省略ServerRunningListener具體程式碼
  4. if(zkclientx!=null){
  5. runningMonitor.setZkClient(zkclientx);
  6. }
  7. //觸發建立一下cid節點
  8. runningMonitor.init();
  9. returnrunningMonitor;

ServerRunningListener的實現如下:

  1. newServerRunningListener(){
  2. /*內部呼叫了embededCanalServer的start(destination)方法。
  3. 此處需要劃重點,說明每個destination對應的CanalInstance是通過embededCanalServer的start方法啟動的,
  4. 這與我們之前分析將instanceGenerator設定到embededCanalServer中可以對應上。
  5. embededCanalServer負責呼叫instanceGenerator生成CanalInstance例項,並負責其啟動。*/
  6. publicvoidprocessActiveEnter(){
  7. try{
  8. MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
  9. embededCanalServer.start(destination);
  10. }finally{
  11. MDC.remove(CanalConstants.MDC_DESTINATION);
  12. }
  13. }
  14. //內部呼叫embededCanalServer的stop(destination)方法。與上start方法類似,只不過是停止CanalInstance。
  15. publicvoidprocessActiveExit(){
  16. try{
  17. MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
  18. embededCanalServer.stop(destination);
  19. }finally{
  20. MDC.remove(CanalConstants.MDC_DESTINATION);
  21. }
  22. }
  23. /*處理存在zk的情況下,在Canalinstance啟動之前,在zk中建立節點。
  24. 路徑為:/otter/canal/destinations/{0}/cluster/{1},其0會被destination替換,1會被ip:port替換。
  25. 此方法會在processActiveEnter()之前被呼叫*/
  26. publicvoidprocessStart(){
  27. try{
  28. if(zkclientx!=null){
  29. finalStringpath=ZookeeperPathUtils.getDestinationClusterNode(destination,ip+":"+port);
  30. initCid(path);
  31. zkclientx.subscribeStateChanges(newIZkStateListener(){
  32. publicvoidhandleStateChanged(KeeperStatestate)throwsException{
  33. }
  34. publicvoidhandleNewSession()throwsException{
  35. initCid(path);
  36. }
  37. });
  38. }
  39. }finally{
  40. MDC.remove(CanalConstants.MDC_DESTINATION);
  41. }
  42. }
  43. //處理存在zk的情況下,在Canalinstance停止前,釋放zk節點,路徑為/otter/canal/destinations/{0}/cluster/{1},
  44. //其0會被destination替換,1會被ip:port替換。此方法會在processActiveExit()之前被呼叫
  45. publicvoidprocessStop(){
  46. try{
  47. MDC.put(CanalConstants.MDC_DESTINATION,String.valueOf(destination));
  48. if(zkclientx!=null){
  49. finalStringpath=ZookeeperPathUtils.getDestinationClusterNode(destination,ip+":"+port);
  50. releaseCid(path);
  51. }
  52. }finally{
  53. MDC.remove(CanalConstants.MDC_DESTINATION);
  54. }
  55. }
  56. }

上述程式碼中,我們可以看到啟動一個CanalInstance實際上是在ServerRunningListener的processActiveEnter方法中,通過呼叫embededCanalServer的start(destination)方法進行的,對於停止也是類似。

那麼ServerRunningListener中的相關方法到底是在哪裡回撥的呢?我們可以在ServerRunningMonitor的start和stop方法中找到答案,這裡只列出start方法。

  1. publicclassServerRunningMonitorextendsAbstractCanalLifeCycle{
  2. ...
  3. publicvoidstart(){
  4. super.start();
  5. processStart();//其內部會呼叫ServerRunningListener的processStart()方法
  6. if(zkClient!=null){//存在zk,以HA方式啟動
  7. //如果需要儘可能釋放instance資源,不需要監聽running節點,不然即使stop了這臺機器,另一臺機器立馬會start
  8. Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
  9. zkClient.subscribeDataChanges(path,dataListener);
  10. initRunning();
  11. }else{//沒有zk,直接啟動
  12. processActiveEnter();
  13. }
  14. }
  15. //...stop方法邏輯類似,相關程式碼省略
  16. }

當ServerRunningMonitor的start方法被呼叫時,其首先會直接呼叫processStart方法,這個方法內部直接調了ServerRunningListener的processStart()方法,原始碼如下所示。通過前面的分析,我們已經知道在存在zkClient!=null的情況,會往zk中建立一個節點。

  1. privatevoidprocessStart(){
  2. if(listener!=null){
  3. try{
  4. listener.processStart();
  5. }catch(Exceptione){
  6. logger.error("processStartfailed",e);
  7. }
  8. }
  9. }

之後會判斷是否存在zkClient,如果不存在,則以本地方式啟動,如果存在,則以HA方式啟動。我們知道,canal server可以部署成兩種方式:叢集方式或者獨立部署。其中叢集方式是利用zk來做HA,獨立部署則可以直接進行啟動。我們先來看比較簡單的直接啟動。

直接啟動:

不存在zk的情況下,會進入else程式碼塊,呼叫processActiveEnter方法,其內部呼叫了listener的processActiveEnter,啟動相應destination對應的CanalInstance。

  1. privatevoidprocessActiveEnter(){
  2. if(listener!=null){
  3. try{
  4. listener.processActiveEnter();
  5. }catch(Exceptione){
  6. logger.error("processActiveEnterfailed",e);
  7. }
  8. }
  9. }

HA方式啟動:

存在zk,說明canal server可能做了叢集,因為canal就是利用zk來做HA的。首先根據destination構造一個zk的節點路徑,然後進行監聽。

  1. /*構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中佔位符{0}會被destination替換。
  2. 在叢集模式下,可能會有多個canalserver共同處理同一個destination,
  3. 在某一時刻,只能由一個canalserver進行處理,處理這個destination的canalserver進入running狀態,其他canalserver進入standby狀態。*/
  4. Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
  5. /*對destination對應的running節點進行監聽,一旦發生了變化,則說明可能其他處理相同destination的canalserver可能出現了異常,
  6. 此時需要嘗試自己進入running狀態。*/
  7. zkClient.subscribeDataChanges(path,dataListener);

上述只是監聽程式碼,之後嘗試呼叫initRunning方法通過HA的方式來啟動CanalInstance。

  1. privatevoidinitRunning(){
  2. if(!isStart()){
  3. return;
  4. }
  5. //構建臨時節點的路徑:/otter/canal/destinations/{0}/running,其中佔位符{0}會被destination替換
  6. Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
  7. //序列化
  8. //構建臨時節點的資料,標記當前destination由哪一個canalserver處理
  9. byte[]bytes=JsonUtils.marshalToByte(serverData);
  10. try{
  11. mutex.set(false);
  12. //嘗試建立臨時節點。如果節點已經存在,說明是其他的canalserver已經啟動了這個canalinstance。
  13. //此時會丟擲ZkNodeExistsException,進入catch程式碼塊。
  14. zkClient.create(path,bytes,CreateMode.EPHEMERAL);
  15. activeData=serverData;
  16. processActiveEnter();//如果建立成功,觸發一下事件,內部呼叫ServerRunningListener的processActiveEnter方法
  17. mutex.set(true);
  18. }catch(ZkNodeExistsExceptione){
  19. //建立節點失敗,則根據path從zk中獲取當前是哪一個canalserver建立了當前canalinstance的相關資訊。
  20. //第二個引數true,表示的是,如果這個path不存在,則返回null。
  21. bytes=zkClient.readData(path,true);
  22. if(bytes==null){//如果不存在節點,立即嘗試一次
  23. initRunning();
  24. }else{
  25. //如果的確存在,則將建立該canalinstance例項資訊存入activeData中。
  26. activeData=JsonUtils.unmarshalFromByte(bytes,ServerRunningData.class);
  27. }
  28. }catch(ZkNoNodeExceptione){//如果/otter/canal/destinations/{0}/節點不存在,進行建立其中佔位符{0}會被destination替換
  29. zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination),true);
  30. //嘗試建立父節點
  31. initRunning();
  32. }
  33. }

可以看到,initRunning方法內部只有在嘗試在zk中建立節點成功後,才會去呼叫listener的processActiveEnter方法來真正啟動destination對應的canal instance,這是canal HA方式啟動的核心。canal官方文件中介紹了CanalServerHA機制啟動的流程,如下:

事實上,這個說明的前兩步,都是在initRunning方法中實現的。從上面的程式碼中,我們可以看出,在HA機啟動的情況下,initRunning方法不一定能走到processActiveEnter()方法,因為建立臨時節點可能會出錯。

此外,根據官方文件說明,如果出錯,那麼當前canal instance則進入standBy狀態。也就是另外一個canal instance出現異常時,當前canal instance頂上去。那麼相關原始碼在什麼地方呢?在HA方式啟動最開始的2行程式碼的監聽邏輯中:

  1. Stringpath=ZookeeperPathUtils.getDestinationServerRunning(destination);
  2. zkClient.subscribeDataChanges(path,dataListener);

其中dataListener型別是IZkDataListener,這是zkclient客戶端提供的介面,定義如下:

  1. publicinterfaceIZkDataListener{
  2. publicvoidhandleDataChange(StringdataPath,Objectdata)throwsException;
  3. publicvoidhandleDataDeleted(StringdataPath)throwsException;
  4. }

當zk節點中的資料發生變更時,會自動回撥這兩個方法,很明顯,一個是用於處理節點資料發生變化,一個是用於處理節點資料被刪除。

而dataListener是在ServerRunningMonitor的構造方法中初始化的,如下:

  1. publicServerRunningMonitor(){
  2. //建立父節點
  3. dataListener=newIZkDataListener(){
  4. //!!!目前看來,好像並沒有存在修改running節點資料的程式碼,為什麼這個方法不是空實現?
  5. publicvoidhandleDataChange(StringdataPath,Objectdata)throwsException{
  6. MDC.put("destination",destination);
  7. ServerRunningDatarunningData=JsonUtils.unmarshalFromByte((byte[])data,ServerRunningData.class);
  8. if(!isMine(runningData.getAddress())){
  9. mutex.set(false);
  10. }
  11. if(!runningData.isActive()&&isMine(runningData.getAddress())){//說明出現了主動釋放的操作,並且本機之前是active
  12. release=true;
  13. releaseRunning();//徹底釋放mainstem}
  14. activeData=(ServerRunningData)runningData;
  15. }
  16. //當其他canalinstance出現異常,臨時節點資料被刪除時,會自動回撥這個方法,此時當前canalinstance要頂上去
  17. publicvoidhandleDataDeleted(StringdataPath)throwsException{
  18. MDC.put("destination",destination);
  19. mutex.set(false);
  20. if(!release&&activeData!=null&&isMine(activeData.getAddress())){
  21. //如果上一次active的狀態就是本機,則即時觸發一下active搶佔
  22. initRunning();
  23. }else{
  24. //否則就是等待delayTime,避免因網路瞬端或者zk異常,導致出現頻繁的切換操作
  25. delayExector.schedule(newRunnable(){
  26. publicvoidrun(){
  27. initRunning();//嘗試自己進入running狀態
  28. }
  29. },delayTime,TimeUnit.SECONDS);
  30. }
  31. }
  32. };
  33. }

那麼現在問題來了?ServerRunningMonitor的start方法又是在哪裡被呼叫的, 這個方法被呼叫了,才能真正的啟動canal instance。這部分程式碼我們放到後面的CanalController中的start方法進行講解。

下面分析最後一部分程式碼,autoScan機制相關程式碼。

3.5 autoScan機制相關程式碼

關於autoscan,官方文件有以下介紹:

結合autoscan機制的相關原始碼:

  1. //
  2. autoScan=BooleanUtils.toBoolean(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN));
  3. if(autoScan){
  4. defaultAction=newInstanceAction(){//....};
  5. instanceConfigMonitors=//....
  6. }

可以看到,autoScan是否需要自動掃描的開關,只有當autoScan為true時,才會初始化defaultAction欄位和instanceConfigMonitors欄位。其中:

其中:

defaultAction:其作用是如果配置發生了變更,預設應該採取什麼樣的操作。其實現了InstanceAction介面定義的三個抽象方法:start、stop和reload。當新增一個destination配置時,需要呼叫start方法來啟動;當移除一個destination配置時,需要呼叫stop方法來停止;當某個destination配置發生變更時,需要呼叫reload方法來進行重啟。

instanceConfigMonitors:型別為Map<InstanceMode, InstanceConfigMonitor>。defaultAction欄位只是定義了配置發生變化預設應該採取的操作,那麼總該有一個類來監聽配置是否發生了變化,這就是InstanceConfigMonitor的作用。官方文件中,只提到了對canal.conf.dir配置項指定的目錄的監聽,這指的是通過spring方式載入配置。顯然的,通過manager方式載入配置,配置中心的內容也是可能發生變化的,也需要進行監聽。此時可以理解為什麼instanceConfigMonitors的型別是一個Map,key為InstanceMode,就是為了對這兩種方式的配置載入方式都進行監聽。

defaultAction欄位初始化原始碼如下所示:

  1. defaultAction=newInstanceAction(){
  2. publicvoidstart(Stringdestination){
  3. InstanceConfigconfig=instanceConfigs.get(destination);
  4. if(config==null){
  5. //重新讀取一下instanceconfig
  6. config=parseInstanceConfig(properties,destination);
  7. instanceConfigs.put(destination,config);
  8. }
  9. if(!embededCanalServer.isStart(destination)){
  10. //HA機制啟動
  11. ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
  12. if(!config.getLazy()&&!runningMonitor.isStart()){
  13. runningMonitor.start();
  14. }
  15. }
  16. }
  17. publicvoidstop(Stringdestination){
  18. //此處的stop,代表強制退出,非HA機制,所以需要退出HA的monitor和配置資訊
  19. InstanceConfigconfig=instanceConfigs.remove(destination);
  20. if(config!=null){
  21. embededCanalServer.stop(destination);
  22. ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
  23. if(runningMonitor.isStart()){
  24. runningMonitor.stop();
  25. }
  26. }
  27. }
  28. publicvoidreload(Stringdestination){
  29. //目前任何配置變化,直接重啟,簡單處理
  30. stop(destination);
  31. start(destination);
  32. }
  33. };

instanceConfigMonitors欄位初始化原始碼如下所示:

  1. instanceConfigMonitors=MigrateMap.makeComputingMap(newFunction<InstanceMode,InstanceConfigMonitor>(){
  2. publicInstanceConfigMonitorapply(InstanceModemode){
  3. intscanInterval=Integer.valueOf(getProperty(properties,CanalConstants.CANAL_AUTO_SCAN_INTERVAL));
  4. if(mode.isSpring()){//如果載入方式是spring,返回SpringInstanceConfigMonitor
  5. SpringInstanceConfigMonitormonitor=newSpringInstanceConfigMonitor();
  6. monitor.setScanIntervalInSecond(scanInterval);
  7. monitor.setDefaultAction(defaultAction);
  8. //設定conf目錄,預設是user.dir+conf目錄組成
  9. StringrootDir=getProperty(properties,CanalConstants.CANAL_CONF_DIR);
  10. if(StringUtils.isEmpty(rootDir)){
  11. rootDir="../conf";
  12. }
  13. if(StringUtils.equals("otter-canal",System.getProperty("appName"))){
  14. monitor.setRootConf(rootDir);
  15. }else{
  16. //eclipsedebug模式
  17. monitor.setRootConf("src/main/resources/");
  18. }
  19. returnmonitor;
  20. }elseif(mode.isManager()){//如果載入方式是manager,返回ManagerInstanceConfigMonitor
  21. returnnewManagerInstanceConfigMonitor();
  22. }else{
  23. thrownewUnsupportedOperationException("unknowmode:"+mode+"formonitor");
  24. }
  25. }
  26. });

可以看到instanceConfigMonitors也是根據mode屬性,來採取不同的監控實現類SpringInstanceConfigMonitor或者ManagerInstanceConfigMonitor,二者都實現了InstanceConfigMonitor介面。

  1. publicinterfaceInstanceConfigMonitorextendsCanalLifeCycle{
  2. voidregister(Stringdestination,InstanceActionaction);
  3. voidunregister(Stringdestination);
  4. }

當需要對一個destination進行監聽時,呼叫register方法

當取消對一個destination監聽時,呼叫unregister方法。

事實上,unregister方法在canal 內部並沒有有任何地方被呼叫,也就是說,某個destination如果開啟了autoScan=true,那麼你是無法在執行時停止對其進行監控的。如果要停止,你可以選擇將對應的目錄刪除。

InstanceConfigMonitor本身並不知道哪些canal instance需要進行監控,因為不同的canal instance,有的可能設定autoScan為true,另外一些可能設定為false。

在CanalConroller的start方法中,對於autoScan為true的destination,會呼叫InstanceConfigMonitor的register方法進行註冊,此時InstanceConfigMonitor才會真正的對這個destination配置進行掃描監聽。對於那些autoScan為false的destination,則不會進行監聽。

目前SpringInstanceConfigMonitor對這兩個方法都進行了實現,而ManagerInstanceConfigMonitor目前對這兩個方法實現的都是空,需要開發者自己來實現。

在實現ManagerInstanceConfigMonitor時,可以參考SpringInstanceConfigMonitor。

此處不打算再繼續進行分析SpringInstanceConfigMonitor的原始碼,因為邏輯很簡單,感興趣的讀者可以自行檢視SpringInstanceConfigMonitor 的scan方法,內部在什麼情況下會回撥defualtAction的start、stop、reload方法 。

4 CanalController的start方法

而ServerRunningMonitor的start方法,是在CanalController中的start方法中被呼叫的,CanalController中的start方法是在CanalLauncher中被呼叫的。

com.alibaba.otter.canal.deployer.CanalController#start

  1. publicvoidstart()throwsThrowable{
  2. logger.info("##startthecanalserver[{}:{}]",ip,port);
  3. //建立整個canal的工作節點:/otter/canal/cluster/{0}
  4. finalStringpath=ZookeeperPathUtils.getCanalClusterNode(ip+":"+port);
  5. initCid(path);
  6. if(zkclientx!=null){
  7. this.zkclientx.subscribeStateChanges(newIZkStateListener(){
  8. publicvoidhandleStateChanged(KeeperStatestate)throwsException{
  9. }
  10. publicvoidhandleNewSession()throwsException{
  11. initCid(path);
  12. }
  13. });
  14. }
  15. //優先啟動embeded服務
  16. embededCanalServer.start();
  17. //啟動不是lazy模式的CanalInstance,通過迭代instanceConfigs,根據destination獲取對應的ServerRunningMonitor,然後逐一啟動
  18. for(Map.Entry<String,InstanceConfig>entry:instanceConfigs.entrySet()){
  19. finalStringdestination=entry.getKey();
  20. InstanceConfigconfig=entry.getValue();
  21. //如果destination對應的CanalInstance沒有啟動,則進行啟動
  22. if(!embededCanalServer.isStart(destination)){
  23. ServerRunningMonitorrunningMonitor=ServerRunningMonitors.getRunningMonitor(destination);
  24. //如果不是lazy,lazy模式需要等到第一次有客戶端請求才會啟動
  25. if(!config.getLazy()&&!runningMonitor.isStart()){
  26. runningMonitor.start();
  27. }
  28. }
  29. if(autoScan){
  30. instanceConfigMonitors.get(config.getMode()).register(destination,defaultAction);
  31. }
  32. }
  33. if(autoScan){//啟動配置檔案自動檢測機制
  34. instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
  35. for(InstanceConfigMonitormonitor:instanceConfigMonitors.values()){
  36. if(!monitor.isStart()){
  37. monitor.start();//啟動monitor
  38. }
  39. }
  40. }
  41. //啟動網路介面,監聽客戶端請求
  42. canalServer.start();
  43. }

5 總結

deployer模組的主要作用:

1、讀取canal.properties,確定canal instance的配置載入方式

2、確定canal instance的啟動方式:獨立啟動或者叢集方式啟動

3、監聽canal instance的配置的變化,動態停止、啟動或新增

4、啟動canal server,監聽客戶端請求