利用streamSet搭建一個簡單的資料管道
本教程利用StreamSet搭建一個簡單的資料管道,具體為:從本地目錄中讀取檔案,並在分支中處理讀取的檔案資料,最後將處理的資料寫入到本地目錄。將通過資料預覽來配置資料管道,並新增資料報警功能。
由於是初次嘗試,難免存在錯誤,不足之處還請大神不吝賜教!!!
目錄
0 資料準備
/<base directory>/tutorial/origin
/<base directory>/tutorial/destination
/<base directory>/tutorial/error
備註:本教程中的<base directory>為StreamSet的安裝目錄,當然你也可以將資料放在其它位置,只要保證啟動StreamSet的使用者對相應目錄有讀寫許可權
1 配置管道屬性
置管道屬性,主要是對異常資料的處理
-
在主介面點機 Create New Pipline;
-
在彈出的對話方塊中輸入 Pipline title 和 optional description,並選擇run the pipeline on Data Collector,點選save進行儲存;
-
在下面的屬性面板中,選擇Error Records選項,配置Error Records屬性為Write to File(將異常資料儲存到檔案中,這樣就可以在不停止pipeline的情況下處理異常資料)
- Write to File的屬性配置如下(沒有的保持預設即可):
Write to File Property |
Description |
Directory |
Directory for error record files. Enter the directory that you set up for the tutorial. We recommended: /<base directory>/tutorial/error Note: To prevent validation errors, the directory must already exist. |
Files Prefix |
This defines a prefix for error record files. By default, the files are prefixed with "SDC" and an expression that returns the Data Collector ID, but that's more than we need here. Delete the default and enter the following prefix: err_ |
Max File Size |
For the tutorial, reduce the files size to something more manageable, such as 5 or 1 MB. |
2 配置資料來源(源點)
資料來源是整個管道的入口,在配置資料來源的時候需要定義連線源點方式、要處理的資料型別以及其它的特殊屬性等
-
選擇 Select origin -> Directory 或者在最右側的階段庫(Stage Library panel)中選擇Directory origin: .
-
在下面的屬性面板中選擇File,並配置相關屬性如下(沒有的保持預設即可):
Directory Property |
Value |
Files Directory |
Directory where you saved the sample file. Enter an absolute path. We recommended: /<base directory>/tutorial/origin. |
File Name Pattern |
The Directory origin processes only the files in the directory that match the file name pattern. The tutorial sample file name is nyc_taxi_data.csv. Since the file is the only file in the directory, you can use something generic, like the asterisk wild card (*) or *.csv. If you had other .csv files in the directory that you didn't want to process, you might be more specific, like this: nyc_taxi*.csv. Or if you want to process files with prefixes for other cities, you might use *taxi*.csv. |
Read Order |
This determines the read order when the directory includes multiple files. You can read based on the last-modified timestamp or file name. Because it's simpler, let's use Last Modified Timestamp. |
-
選擇Data Format配置資料格式屬性如下
Delimited Property |
Description |
Data Format |
The data in the sample file is delimited, so select Delimited. |
Delimiter Format Type |
Since the sample file is a standard CSV file, use the default: Default CSV (ignores empty lines). |
Header Line |
The sample file includes a header, so select With Header Line. |
Root Field Type |
This property determines how the Data Collector processes delimited data. Use the default List-Map. This allows you to use standard functions to process delimited data. With the List root field type, you need to use delimited data functions. |
- 最後介面看起來像這樣
3 資料預覽
- 點選預覽圖示(Preview icon) ,按照如下進行預覽配置
Data Preview Property |
Description |
Preview Source |
Use the default Configured Source to use the sample source data. |
Write to Destinations and Executors |
By default, this property is not selected. In general, you should keep this property clear to avoid writing data to destination systems or triggering tasks in destination systems. |
Show Field Type |
By default, this property is selected. Keep it selected to see the data type of fields in the record. |
Remember the Configuration |
Select to use these properties each time you run data preview. When you select this option, the UI enters data preview without showing this dialog box again. |
注:如果無法預覽資料,請檢查前面步驟是否正確,解決除Validation_0011問題以外的其它問題。Validation_0011表示資料沒有任何連線,不影響資料預覽。
4 通過流選擇器路由資料
-
流選擇器是基於使用者定義的條件規則進行資料過濾,不符合條件的記錄將會被路由到預設的流管道中
-
選擇Select Processor to Connect > Stream Selector或者在Stage Library panel中選擇 Stream Selectorprocessor (),然後連線源資料
-
點選General面板,選擇Required Fields屬性值為/payment_type,如果之前預覽資料沒有問題,則會自動顯示欄位,如果沒有,可以手動輸入
-
點選condition面板,配置過濾條件,通過點選➕號增加新的條件,滿足過濾條件的將會被分流到管道1,其他的分流到管道2
5 使用Jython處理信用卡型別
-
通過點選package manage 來 新增Jython Evaluator Processor,點選install即可開始安裝,安裝完成後點選重啟資料收集器
-
重啟之後就可以選擇Jython Evaluator,將其連線到管道1
-
點選Jython屬性面板,使用預設的批處理方式處理資料
-
在Script的文字框中刪除所有註釋,然後新增如下程式碼
try:
for record in records:
cc = record.value['credit_card']
if cc == '':
error.write(record, "Payment type was CRD, but credit card was null")
continue
cc_type = ''
if cc.startswith('4'):
cc_type = 'Visa'
elif cc.startswith(('51','52','53','54','55')):
cc_type = 'MasterCard'
elif cc.startswith(('34','37')):
cc_type = 'AMEX'
elif cc.startswith(('300','301','302','303','304','305','36','38')):
cc_type = 'Diners Club'
elif cc.startswith(('6011','65')):
cc_type = 'Discover'
elif cc.startswith(('2131','1800','35')):
cc_type = 'JCB'
else:
cc_type = 'Other'
record.value['credit_card_type'] = cc_type
output.write(record)
except Exception as e:
error.write(record, e.message)
注:貼上程式碼的時候可能由於格式問題(縮排等)會出錯,請自行調整
-
出現以下問題又可能安裝Jython的時候沒有安裝成功,缺少了某些庫,建議解除安裝後重新安裝
6 遮蔽信用卡號碼
為了防止信用卡號碼等敏感資訊暴露出來,我們使用欄位掩碼來遮蔽信用卡號碼
-
在Jython Evaluator之後增加一個Field Mask處理器
-
然後點選屬性面板中的Mask table,配置如下屬性
Field Masker Property |
Configuration |
Mask Type |
Regular Expression |
Regular Expression |
(.*)([0-9]{4}) |
Groups to Show |
2 |
注:正則表示式(.*)([0-9]{4})可以遮蔽除最後四位數字以外的所有數字
7 將資料寫入到目標
在此,我們將資料寫入到本地檔案中
-
在Field Mask之後連線一個Local FS
-
點選Output Files 並配置如下屬性值
Local FS Property |
Configuration |
Files Prefix |
Defines a prefix for output file names. By default, the files are prefixed with "SDC" and an expression that returns the Data Collector ID, but that's more than we need here. Let's simplify and use "out_" instead. |
Directory Template |
By default, the directory template includes datetime variables to create a directory structure for output files. This is intended for writing large volumes of data. Since we only have the sample file to process, we don't need the datetime variables. Go ahead and delete the default and enter the directory where you want the files to be written. We suggested: /<base directory>/tutorial/destination. |
Max File Size (MB) |
For the tutorial, let's lower the file size to something manageable, like 5 or 1. |
-
點選Data Format,配置如下:
DELIMITED PROPERTY |
CONFIGURATION |
Data Format |
Delimited |
Header Line |
With Header Line |
8 利用Expression Evaluator新增相應欄位
Jython Evaluator指令碼在信用支付分支中添加了一個新欄位。 為了確保所有記錄具有相同的結構,我們將使用Expression Evaluator將相同的欄位新增到非信用分支。這可確保所有記錄在寫入目標時具有相同的格式。為此,我們使用資料預覽來驗證Jython Evaluator如何將信用卡型別新增到記錄中
-
點選Preview圖示進行資料預覽,如果此時報錯,請檢查錯誤原因
-
點選Jython Evaluator即可顯示資料經過Jython Evaluator時的輸入輸出資訊,可以發信output data多了一個欄位credit_card_type.
-
關閉Preview,增加一個Expression Evaluator連線Stream Selector的第2個管道
-
點選Expressions並配置如下屬性:
OUTPUT FIELD |
EXPRESSION |
/credit_card_type |
n/a |
-
連線Expression Evaluator到FS destination
9 建立資料規則並報警
現在,在執行基本管道之前,讓我們新增一個數據規則和警報。資料規則是使用者定義的規則,用於檢查在兩個階段之間移動的資料。它們是尋找異常值和異常資料的有力方法。資料規則和警報需要對通過管道的資料進行詳細的理解。對於更一般的管道監測資訊,可以使用度量規則和警報。
-
在Stream Selector 和 Jython Evaluator, 點選圖示 .然後在下面點選Add
-
在資料規則對話方塊中配置如下資訊:
DATA RULE PROPERTY |
DESCRIPTION |
Label |
Missing Card Numbers |
Condition |
${record:value("/credit_card") == ""} |
Sampling Percentage |
35 |
Alert Text |
At least 10 missing credit card numbers! |
Threshold Value |
10 |
-
點選Save並勾選Active
10 執行
點選開始按鈕即可開啟執行
參考: