1. 程式人生 > >使用KETTLE從mysql同步增量資料到oracle

使用KETTLE從mysql同步增量資料到oracle

初次使用ETL工具抽取並同步資料,搜尋之後決定使用kettle,使用後感覺很方便。

本次是基於一個很小的需求,需要把老系統的mysql資料在一段新老系統共存時期內產生的資料實時傳輸到新系統oracle中,因為實時性要求不算高,所以我沒有做觸發器這些對接,只單純的使用kettle做了一個抽取轉換傳輸,定時執行。下面記錄一下本次的操作,並寫一下自己遇到的坑。

老系統mysql表很大,本次基於一個小的需求,只需要抽取其中的兩個欄位同步傳輸。兩個欄位均是varchar型別,相對比較簡單。我嘗試過傳輸mysql的int(11)和oracle的number,發現需要把oracle的number改為number(10),二者才能對的上號。

工具:kettle的pdi-ce-7.0.0.0-25,可去官網下載;mysql,oracle

思路:先將mysql老資料和oracle同步後的資料都查出來,轉換欄位統一整合交給kettle處理,kettle會依據關鍵欄位和時間戳來判斷來自mysql的資料對oracle來講,是新增、更新、刪除還是無任何操作,並分別標註標識位,後續一步步判斷標識位,最後轉換成oracle欄位,插入/更新/刪除資料庫資料。

整個流程使用kettle分為兩部分,一部分是一個轉換,是流程執行的主要戰場;另一個是一個作業,用來迴圈執行上一個轉換,達到定時執行的效果。

轉換步驟圖:

作業流程圖:

作業流程圖很簡單,主要是迴圈定時執行轉換,忽略不計。本次主要講的是轉換。

轉換首先要建立資料庫連線,最後再講。

步驟一:從mysql和oracle中查詢所有資料,傳送給下一步。

mysql是每次同步的源頭,oracle是每次同步的目標。之所以兩處都要查出來,是因為要在下一步中比對二者的資料,判斷哪些是增量資料。網站速度測試

B2BTest節點和SROTest節點:

從二者查出來的資料,統一歸整成兩個欄位,TASKNO和SAPNO,然後推入下一個節點。

步驟二:合併上一步的記錄,並分析增量資料,ETL會自動給每條資料都打上標記flagfield

關鍵欄位指的是用來分析增量資料的依據性欄位,資料欄位指的是所有需要合併整理的欄位,標記打在flagfield上。

因為是從Mysql同步到oracle,所以舊資料來源選擇oracle的,新資料來源選擇mysql的

步驟三:將標記和資料進一步處理,對映一下增量資料標記

kettle會自動在上一步打上標記,預設值是 deleted、new、changed、identical(什麼也不做的意思,實際是打上null標記)。我們可以對映成我們自己的標記,用於下一步的處理。

步驟四:第一次開始過濾增量標記,開始第一步分支流程處理。

本次過濾是將flagfield標記為null的,也就是identical指代的標記的資料,全部扔到空操作中。這些資料毫無變化,所以不必做任何操作。

需要下一步處理的變化資料,全都丟到下一步的獲取系統時間中。

步驟五:獲取系統時間

獲取系統時間的目的是給資料打上時間戳並存入目標資料庫,如果不方便存入目標資料庫,放入一箇中間表也行的。反正下次整理資料的時候要能搞到這個值。這裡也是我不明白的一點,我並沒有從目標資料庫中查詢這個時間戳,ETL如何找到這個時間戳,並知道哪些是該增加還是不該,哪些是該更新還是不該的。這個沒有想明白,因為如果不加系統時間,你會發現ETL會全量刪除,全量增加目標資料庫的資料。加上這個時間,就會少量更新、刪除、新增。如果有誰能看到這篇日誌記錄,煩請告知。

步驟六:第二次過濾增量資料標記。

本次會分離需要新增的資料出來,交給後續處理入庫;更新和刪除的資料,需要繼續下一步的過濾。

上圖有兩步,從flagfield中過濾出來需要新增的資料,然後拋給分支“準備插入目標資料庫”,在這個子流程節點,會將流中的欄位(欄位名稱),轉換成資料庫中的欄位(改名稱成),有兩個欄位flagfield和UPDATE_TIME無需轉換,本來就是這個欄位,所以無需新增“改名成”列。

之後就是插入資料庫。這裡有個坑,不要使用“插入”操作功能,要使用表輸出。不知道為什麼,使用插入操作功能,總會出現少量資料的誤差。

步驟七:第三次過濾增量資料標記

本次過濾的是剩下的更新和刪除,這兩種標記的資料均會被推入資料庫中。

在後面就是更新和刪除資料庫了:

更新資料庫:

刪除資料庫:

以上就是一個流程的執行,如果要迴圈執行,則要開啟一個作業,呼叫轉換,設定定時迴圈的時間條件即可。

資料庫的建立:

資料庫的建立比較簡單,需要將對應的連線jar放入目錄下,百度一搜一大堆。只是在oracle上有點坑就是了。mysql連線比較簡單,忽略不講,oracle裡,資料庫名稱實際指的是資料庫對應的 sid,可以到oracle裡查詢,如果沒有許可權,建議你從資料庫名稱開始,後面加0、1、2等,基本上都會試出來。