DataX 3.0 原始碼解析一
阿新 • • 發佈:2020-10-19
# 原始碼解析
## 基本呼叫類分析
任務啟動由python指令碼新建程序進行任務執行,後續執行由Java進行,以下將對java部分進行分
其中的呼叫原理機制。
### Engine
首先入口類為`com.alibaba.datax.core.Engine`的`main`方法,其中通過呼叫其本身的靜態方法`entry`,該方法主要針對輸入參入進行格式化以及校驗:
```java
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
```
其中需要注意執行模式是通過`RUNTIME_MODE = cl.getOptionValue("mode")`程式碼直接賦值給靜態變數的,針對命令列引數採用了`org.apache.commons`的`BasicParser`解析,針對任務的配置檔案則通過其本身的`ConfigParser`進行解析(可以支援本地和網路檔案)。
完成配置初始化後該方法將例項化本身並呼叫其`start`方法,將初始化好的配置物件傳入其中。該方法首先將型別轉換進行初始化,以保證在後續資料匯入匯出中不相容型別可以進行順利的轉換工作,具體通過`ColumnCast.bind(configuration)`方法進行繫結,其中主要針對三種類型進行初始化工作:
```java
StringCast.init(configuration);
DateCast.init(configuration);
BytesCast.init(configuration);
```
接著就是利用`LoadUtil.Bind(pluginCoonfigs)`儲存外掛,便於後續讀取外掛相關配置資訊內容。剩下就是該函式的核心流程,即判斷當前的任務執行模式,是`TaskGroup`還是`Job`模式。但通過實際分析來看基本都是`Job`模式,所以後續我們主要以`JobContainer`為切入點,另一個則為`TaskGroupContainer`。兩者均繼承自`AbstractContainer`基類,並通過呼叫他們的`start`方法進行啟動。
對於非`Standlone`模式還支援記錄任務進度情況,進行彙報的功能。具體由最後例項化的`PerfTrace`類實現。
### JobContainer
首先該類的建構函式中僅初始化了`ErrorRecordChecker`類用於檢查任務是否到達錯誤記錄限制。而主要的執行則落在了`start`方法中,細心讀者可以發現其中讀取了`job.setting.dryRun`配置引數,判斷是否需要執行預檢查(preCheck)。正常工作流程則如下所示:
1. preHandle
Job前置操作,即初始化preHandler外掛並執行其`preHandler`;
```java
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName);
// todo...
handler.preHandler(configuration);
```
1. init
初始化reader和writer,實際方法中根據讀寫外掛各自執行了對應的初始化方法,具體程式碼如下所示。
```java
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
```
其中各方法均類似,就是讀取對應載入對應的外掛物件並呼叫外掛對應的方法進行相關配置的設定以及對應方法的初始化,選取`initJobReader`方法中的部分程式碼片段如下:
```java
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// todo...
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();
```
1. prepare
全域性準備工作,比如odpswriter清空目標表。由於讀寫外掛的特殊性質,其方法內部主要也是執行了各型別外掛的方法來實現準備工作。
```java
this.prepareJobReader();
this.prepareJobWriter();
```
其中各自方法的差異性較小,主要就是例項化外掛然後直接呼叫其對應的`prepare`即可。
4. split
拆分`Task`,引數`adviceNumber`為建議的拆分數。除此之外我們還可以通過位元組和事務的限速來進行控制,從而決定Channel的數量。具體配置引數如下:
* `job.setting.speed.byte`:總BPS限速,如果存在值則單個Channel的BPS不能為空,通過總限速除以單個Channel限速得出Channel的需求數量;
* `core.transport.channel.speed.byte`:單個Channel的BPS限速;
* `job.setting.speed.record`:總TPS限速,如果存在則單個Channel的TPS不能為空,通過總限速除以單個Channel限速得出Channel的需求數量;
* `core.transport.channel.speed.record`:單個Channel的TPS限速;
如果兩個限速均存在則取值最少的那一個,如果兩者都沒有設定則通過`job.setting.speed.channel`引數獲取,最終決定`needChannelNumber`引數。根據得出的引數進行Reader與Writer的拆分。
```java
List readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List writerTaskConfigs = this
.doWriterSplit(taskNumber);
```
以上這兩種方法大同小異,只是內部讀取的外掛不同,這裡我們就以Reader為例進行說明。內部例項化好對應外掛後,通過外掛Job的split方法進行實際切分。
```java
List readerSlicesConfigs =
this.jobReader.split(adviceNumber);
```
而實際的切分則需要由外掛開發人員通過實現Job的`split`方法來滿足,該方法將返回`Configuration`列表,最終將會把reader和writer以及配置項`job.content[0].transformer`重新整合成`contentconfig`並作為變數`configuration`中Key為`job.content`的值,從而便於將其傳遞至各Task中。
```java
List contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
```
5. schedule
完成任務的切換後將開始執行任務。由於實際任務是由`TaskGroupContainer`執行,為此我們還需要劃分對應`TaskGroup`需要執行的Task,該引數通過`core.container.taskGroup.channel`進行配置,預設為5。決定每個Group執行那些Task的則由以下方法進行決定,將直接返回對應任務組的配置引數。
```java
List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
```
關於該方法的具體剖析可以跳轉到本[篇幅](./#JobAssignUtil.assignFairly)
完成任務分配後我們就需要根據執行模式決定排程器,通過這裡的原始碼可以明顯看出其`DataX 3.0`是經過了閹割,僅保留了單機執行模式。
```java
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
```
故後續我們僅能描述單機模式下關於任務排程的工作原理。首先是排程器初始化的核心方法`initStandaloneScheduler`,其方法主要是初始化了`StandAloneJobContainerCommunicator`類用於通訊(其中collect由[ProcessInnerCollector](#ProcessInnerCollector)提供,reporter由[ProcessInnerReporter](#ProcessInnerReporter)提供),`StandAloneScheduler`則為實際排程器。具體的說明請跳轉到本[篇幅](#StandAloneScheduler)。最終將對應的配置資訊傳入排程器中進行執行就完成了。
```java
scheduler.schedule(taskGroupConfigs);
```
6.
### ProcessInnerCollector
在`AbstractScheduler`的`schedule`中通過`StandAloneJobContainerCommunicator`類呼叫了其`collect`方法,而其方法的背後則是其他類對應的方法。
```java
public Communication collect() {
return super.getCollector().collectFromTaskGroup();
}
```
該類為`ProcessInnerCollector`類,其對應的方法依然是`LocalTGCommunicationManager`靜態類其中一個靜態方法。
```java
public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication();
}
```
其內部也是將之前每個TaskGroup所建立的`Communication`維護了一個靜態字典並在需要的時候進行合併。
```java
public static Communication getJobCommunication() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskGroupCommunication :
taskGroupCommunicationMap.values()) {
communication.mergeFrom(taskGroupCommunication);
}
return communication;
}
```
### ProcessInnerReporter
在`AbstractScheduler`的`schedule`中通過`StandAloneJobContainerCommunicator`類呼叫了其`report`方法,而其方法的背後則是其他類對應的方法。
```java
public void report(Communication communication) {
super.getReporter().reportJobCommunication(super.getJobId(), communication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
reportVmInfo();
}
```
而Reporter物件則為`ProcessInnerReporter`類,對應的方法則是該類的`reportJobCommunication`方法,其本身也是呼叫了其他靜態類的靜態方法進行實現。
```java
public void reportJobCommunication(Long jobId, Communication communication) {
// do nothing
}
```
可以看到當前原始碼並沒有是實現輸出Job的統計資訊。
### JobAssignUtil.assignFairly
該方法首先通過Channel數量除以每個TaskGroup可以處理的Channel數量從而得出TaskGroup數量。在實際切分中考慮到Shuffle的成本,外掛開發者可以通過`reader.parameter.loadBalanceResourceMark`與`writer.parameter.loadBalanceResourceMark`來劃定每個Task的標識,從而便於在分配任務時將對應標識的Reader與Writer分配到同一個TaskGroup中,如果不存在則會自動設定一個預設的標識。
```java
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
boolean hasLoadBalanceResourceMark = StringUtils.isNotBlank(readerResourceMark) ||
StringUtils.isNotBlank(writerResourceMark);
if (!hasLoadBalanceResourceMark) {
for (Configuration conf : contentConfig) {
conf.set(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK, "aFakeResourceMarkForLoadBalance");
}
Collections.shuffle(contentConfig, new Random(System.currentTimeMillis()));
}
```
根據資源標識將開始將根據資源標識將對應的Task進行切換,其主要由`parseAndGetResourceMarkAndTaskIdMap`方法進行分配,其內部就是根據資源標識維護一個字典,如果是預設標識則字典僅有一個物件,所有的Task都歸屬其中。
```java
LinkedHashMap> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
```
根據標識完成分組後就需要將Task配置按照TaskGroup進行分配,以滿足呼叫的需要,這裡通過呼叫`doAssign`方法來滿足。其方法主要先獲取到按照標識分組後其中最大組的成員數量`mapValueMaxLength`,並與標識數採用進行2層迴圈將各個task配置儲存到對應分組編號的資料中。
```java
for (int i = 0; i < mapValueMaxLength; i++) {
for (String resourceMark : resourceMarks) {
if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
taskGroupIndex++;
resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}
```
以上進行以陣列的形式進行分配,而實際需要使用`Configuration`物件,為此我們還需要將以上資訊重新組織儲存到對應的配置物件中,具體結構可以參考如下原始碼:
```java
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
result.add(tempTaskGroupConfig);
}
```
上述方法雖然完成了最終的任務,但是實際每個TaskGroup所分配到的Task並不是平均的,這就導致對應的TaskGroup的Channel也是不均衡的,為了便於後期的優化,我們還需要將對應TaskGroup所需的Channel數量存入到`core.container.taskGroup.channel`配置項中。
### StandAloneScheduler
該類本身並沒有太多實質性的內容,具體的功能內容更多的在其父類`ProcessInnerScheduler`與`AbstractScheduler`中,關於該兩個類的說明將直接在本篇幅中進行概述,不新起篇章。
我們以`schedule`的呼叫順蘇為例進行說明,首先獲取用於彙報的時間間隔,分別為`core.container.job.reportInterval`與`core.container.job.sleepInterval`引數,前者為每次彙報的時間間隔,預設為30秒,後者為每次睡眠時間,即每次彙總採集的間隔時間。
由於任務的執行無法避免錯誤的出現,為了保障任務的成功執行,在每次彙報的同時還增加了額外的錯誤檢查機制,通過髒資料出現的次數與比率進行判斷,從而中止任務的繼續。
```java
errorLimit = new ErrorRecordChecker(configurations.get(0));
// to do...
errorLimit.checkRecordLimit(nowJobContainerCommunication);
```
其通過`ErrorRecordChecker`類提供,該類通過`recordLimit`檢查條數與`percentageLimit`百分比檢查任務是否到達錯誤記錄的限制,對應的限制通過讀取配置中的`job.setting.errorLimit.record`與`job.setting.errorLimit.percentage`引數。對於任務的執行最核心的當然是`startAllTaskGroup`方法了,該方法位於`ProcessInnerScheduler`類中。
該方法直接利用Java本身的`Executors.newFixedThreadPool`方法建立了分組數的執行緒池資源,然後通過將`TaskGroupContainer`物件包裝到`TaskGroupContainerRunner`物件中來進行執行。
```java
TaskGroupContainer taskGroupContainer = new TaskGroupContainer(configuration);
return new TaskGroupContainerRunner(taskGroupContainer);
```
其`TaskGroupContainerRunner`內部的`run`實際依然是呼叫了對應`TaskGroupContainer`物件的`start`方法。而關於該類的說明將會另啟篇幅進行具體說