1. 程式人生 > 其它 >datax 匯入資料中文亂碼_Datax的一次填坑經歷

datax 匯入資料中文亂碼_Datax的一次填坑經歷

技術標籤:datax 匯入資料中文亂碼

2833305390597b71e46d53b6e9bccbe8.png
使用Datax進行兩個叢集間的資料同步,在讀取HDFS資料時,會出現資料丟失問題,本文針對資料丟失問題做出了分析以及對應解決方案,希望幫助大家在使用Datax過程中避免該問題的出現!

問題描述

最近在使用Datax進行兩個叢集間的資料同步,將老叢集(Hive)資料同步到新叢集(Phoenix)中,由於兩個叢集的大資料節點IP不同,老叢集有三個IP段(10、20、30)採用的是30段(大資料專用網斷),而新叢集是採用的20網斷,所以無法通過Datax連結老叢集HDFS路徑,當然這個並不是問題重點。

第一次同步的時候資料是全部同步到了新叢集,然而,因為業務關係某個表需要新增三個欄位,之後表資料又重新構建了一遍,我們稱之為info表吧,然後再次執行指令碼將info表同步一下,本來是5000W+的資料,同步完之後缺失了700W+

請忽略讀寫失敗總數,這個不是問題關鍵,這個是由於欄位長度超出了Phoenix表配置的長度,並非Datax問題。

注意看讀出記錄總數:4442W,而我實際資料表中是有5152W,少了700W+

之後重試了兩次,問題依舊!

問題分析

匯入成功了4442w,剩餘700w沒有匯入,可以先檢視這未成功匯入的700w資料是否有資料問題,之後進行了以下操作,最終定位到問題所在

步驟一

將新叢集的HDFS資料,匯入到hive中檢視資料量是否缺少,發現將資料同步到hive之後,資料量與老叢集是保持一致,這裡基本可以斷定資料本身是沒有問題的

步驟二

查詢未成功匯入到phoenix的資料資訊,先根據某一個欄位的groupby資料量檢視那個條件的資料量少且資料丟失了,從這個欄位條件入手,然後找到了100+條資料未成功匯入到phoenix,更加神奇的是這100+條資料,在HDFS中屬於同一個檔案塊000676_0,同時這100+條資料在塊中是連續的(這也是一個問題)

然而這100條資料的上一條資料是在phoenix中可以查詢到的,

所以將這100條資料單獨抽取出來放在HDFS塊中,然後單獨的進行同步,在啟動同步之後,發現日誌中的異常如下:

提示,讀取的列越界,原始檔改行有36列,您嘗試讀取第37列

將該條資料查出來然後在原生代碼split一下,發現列數果然不對(在datax中的json檔案中配置的是39列,實際也是39列),然後將第一行資料刪掉,再次同步一下,發現數據同步進去了。那麼在這裡發現了一個問題就是當某一個檔案塊中其中一條資料讀取解析異常了,那麼讀取到的這批資料就都會異常(程式碼裡面是批量讀取,批量解析的)

步驟三

將Datax程式碼clone下來研究了一下,這裡要提及一句我們phoenix使用的是5.0對應hbase2.0版本,datax也是在我們同步資料前的16天提交支援了hbase20xsqlreader

hbase20xsqlwrite

先從hbase20xsqlwrite 包程式碼研究,發現是從RecordReceiver接收器中獲取到的資料,而最後讀出記錄總數:44426102則是記錄了一共從RecordReceiver接收到的資料條數。所以問題沒有出在write這一塊,基本判斷應該是出在了hdfsreader

轉而研究一下hdfsreader的程式碼,類結構很簡單,只需要直接看HdfsReader類即可。裡面的邏輯也很簡單,在這裡就不過多的解釋了。

從上面warn資訊裡面看,是UnstructuredStorageReaderUtil這個類發出的警告,具體的包名為:

com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil ;

發現是該類的transportOneRecord(xx) 函式裡面報的異常,然後向上追溯到了doReadFromStream函式

doReadFromStream(BufferedReader reader, String context,Configuration readerSliceConfig, RecordSender recordSender,TaskPluginCollector taskPluginCollector)

這裡面可以看到不管讀取的檔案是TEXT型別還是CSV型別的,都是按照CSV進行解析的。

問題定位

資料解析之後列的個數的確與實際的不符合,之後檢視資料,發現出問題的資料中有幾串連續的空的,所以資料在解析的時候將空的給過濾了,比如[1,2,,,,6]解析得到的是[1,2,6],所以才會出現列越界的問題。

這裡需要修改一下程式碼:

csvReader = new CsvReader(reader);
csvReader.setDelimiter(fieldDelimiter);
setCsvReaderConfig(csvReader);
String[] parseRows;
while ((parseRows = UnstructuredStorageReaderUtil.splitBufferedReader(csvReader)) != null) {    
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,column, parseRows, nullFormat, taskPluginCollector);
}

修改為:

String line = null;
while ((line = reader.readLine()) != null) {
String[] parseRows = line.split(delimiterInStr, -1);
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,column, parseRows, nullFormat, taskPluginCollector);   
}

同時要在finally中將CsvReader.close 去掉,試圖設定csvReader的引數實現空值保留,但是均無效,如果您有更好的辦法可以修改csvreader的話也可以。這樣儘量保證原始碼的味道!但是不知道是否能保證批次資料裡面一條失敗是否會導致後面的不成功。

將程式碼改成上面這樣同時也避免了這樣的一個問題。一行一行的讀取,然後傳送給write,由write去決定接受多少條進行寫入!

至此,再次打包編譯,再次執行同步指令碼,資料完全同步過去,沒有異常的資料!

問題浮現

  • 在讀取HDFS資料時,沒有對空串進行處理,導致讀到的列出與配置的列數不一致
  • 當讀到的批次資料通過csvreader進行解析時,有一條失敗其他條也並沒有傳送給write接收器

問題引申

Datax到同步資料的時候,有一個髒資料的概念,比如這次在同步資料時候,會有一些髒資料的問題發生,plugin的處理方式時,如果批次裡面有一條出現了問題,那麼就會將這批次資料進行迴圈操作,找出出問題的那一條,加入髒資料處理任務裡面,然後髒資料任務是將任務裡面的資料重試三次,如果三次都失敗就丟掉了!

Datax本身框架是可以將髒資料本地輸出或者集中式彙報的,只是plugin在write的時候直接將異常拋給了髒資料任務,而沒有單獨做處理,所以這塊也需要做一些修改,將髒資料統一寫入指定檔案中!需要在json裡面加一個配置

此問題列為第三個問題!

問題總結

DataX本身是一個很好的支援異構資料同步的框架,而且具備很好的靈活擴充套件性,如果不修改程式碼的情況下,從資料角度出發,通過採取空值轉為NULL或者其它字元也可以解決該問題。

這裡通過修改原始碼,將字串分割處保留空值內容,最終解決問題。

感謝關注“碼農星球”。本文版權屬於“碼農星球”。我們提供諮詢和培訓服務,關於本文有任何困惑,請關注並聯系我們。