1. 程式人生 > >StreamSet學習(一)Pipeline Concepts and Design

StreamSet學習(一)Pipeline Concepts and Design

一、資料流的合併和分支

二、Dropping Unwanted Records

      (1)required field可以在processor, executor, and most destination 節點,如果一個記錄沒人包含任何的必要欄位那麼這條記錄將別丟給錯誤處理操作。

       (2)Preconditions  前置條件,資料必須滿足前置條件的情況下才能進入相關的步驟進行資料的處理。前置條件可以在processor, executor, and most destination 中使用,在前置條件中可以使用function,變數和執行時的屬性

     例如使用如下表達式排

${record:value('/COUNTRY') == 'US'}

三、Error Record Handling

record handling 可以在stage level 和 pipeline level進行定義,也可以將stage中定義的錯誤不進行處理,傳給pipeline的錯誤處理。

stage中的錯誤處理機制會優先與pipeline的錯誤機制。

注意:缺少必要欄位的記錄將不會進入此stage,這些資料直接被pipeline的error handing進行處理

錯誤處理的常見方式:

pipeline error Handing

1.丟棄資料(discard) 2.寫入管道(Write to Another Pipeline):此模式需要自己建立SDC RPC origin pipeline 3.寫入檔案系統 4.寫入kafka

stage error handing

1.丟棄資料(discard)2.將error傳送到pipeline error handing(Send to Error)進行處理 3.停止pipeline(Stop Pipeline):停止pipeline並記錄相關錯誤資訊,停止管道打錯誤在管道歷史記錄中顯示為error .注意:叢集模式暫不支援此模式

四、Processing Changed Data(處理變化的資料)

Steamsets可以捕獲資料的變化,比如增刪改查。

1.常用CDC-enabled stages:

       JDBC Query Consumer for Microsoft SQL Server

      MySQL Binary Log

     Oracle CDC Client

     PostgreSQL CDC Client

      SQL Parser,

      SQL Server CDC Client,

      SQL Server Change Tracking

2.CRUD enabled stages:

JDBC Tee processor

JDBC Producer destination

資料處理(processing the data),因為儲存的log日誌儲存了不同格式的記錄資料,使用 JDBC Tee processor and JDBC Producer能夠解碼大多數更改日誌格式,從而根據原始更改日誌生成記錄資料。當使用其他的CRUD-enabled destinations,你需要新增其它stage用於處理資料格式

對應mysql的資料變更捕獲:https://streamsets.com/documentation/controlhub/latest/help/pdesigner/datacollector/UserGuide/Pipeline_Design/CDC-Overview.html#concept_apw_l2c_ty

In contrast, the MySQL Server binary logs read by the My SQL Binary Log origin provides new or updated data in a New Data map field and changed or deleted data in a Changed Data map field. You might want to use the Field Flattener processor to flatten the map field with the data that you need, and a Field Remover to remove any unnecessary fields

For details on the format of generated records, see the documentation for the CDC-enabled origin.

常見案列:

You can use CDC-enabled origins and CRUD-enabled destinations in pipelines together or individually. Here are some typical use cases:

CDC-enabled origin with CRUD-enabled destinations

You can use a CDC-enabled origin and a CRUD-enabled destination to easily process changed records and write them to a destination system.

For example, say you want to write CDC data from Microsoft SQL Server to Kudu. To do this, you use the CDC-enabled JDBC Query Consumer origin to read data from a Microsoft SQL Server change capture table. The origin places the CRUD operation type in the sdc.operation.type header attribute, in this case: 1 for INSERT, 2 for DELETE, 3 for UPDATE.

You configure the pipeline to write to the CRUD-enabled Kudu destination. In the Kudu destination, you can specify a default operation for any record with no value set in the sdc.operation.type attribute, and you can configure error handling for invalid values. You set the default to INSERT and you configure the destination to use this default for invalid values. In the sdc.operation.type attribute, the Kudu destination supports 1 for INSERT, 2 for DELETE, 3 for UPDATE, and 4 for UPSERT.

When you run the pipeline, the JDBC Query Consumer origin determines the CRUD operation type for each record and writes it to the sdc.operation.type record header attribute. And the Kudu destination uses the operation in the sdc.operation.type attribute to inform the Kudu destination system how to process each record. Any record with an undeclared value in the sdc.operation.type attribute, such as a record created by the pipeline, is treated like an INSERT record. And any record with an invalid value uses the same default behavior.

CDC-enabled origin to non-CRUD destinations

If you need to write changed data to a destination system without a CRUD-enabled destination, you can use an Expression Evaluator or scripting processor to move the CRUD operation information from the sdc.operation.type header attribute to a field, so the information is retained in the record.

For example, say you want to read from Oracle LogMiner redo logs and write the records to Hive tables with all of the CDC information in record fields. To do this, you'd use the Oracle CDC Client origin to read the redo logs, then add an Expression Evaluator to pull the CRUD information from the sdc.operation.type header attribute into the record. Oracle CDC Client writes additional CDC information such as the table name and scn into oracle.cdc header attributes, so you can use expressions to pull that information into the record as well. Then you can use the Hadoop FS destination to write the enhanced records to Hive.

Non-CDC origin to CRUD destinations

When reading data from a non-CDC origin, you can use the Expression Evaluator or scripting processors to define the sdc.operation.type header attribute.

For example, say you want to read from a transactional database table and keep a dimension table in sync with the changes. You'd use the JDBC Query Consumer to read the source table and a JDBC Lookup processor to check the dimension table for the primary key value of each record. Then, based on the output of the lookup processor, you know if there was a matching record in the table or not. Using an Expression Evaluator, you set the sdc.operation.type record header attribute - 3 to update the records that had a matching record, and 1 to insert new records.

When you pass the records to the JDBC Producer destination, the destination uses the operation in the sdc.operation.type header attribute to determine how to write the records to the dimension table.

 

五、異常特定字元(Control Character Removal)

您可以使用幾個步驟從資料中刪除控制字元——例如轉義字元或傳輸結束字元。刪除控制字元以避免建立無效記錄,當資料收集器刪除控制字元時,它將刪除ASCII字元程式碼0-31和127,但以下情況除外:

  • 9 - Tab
  • 10 - Line feed
  • 13 - Carriage return

什麼源支援Ignore Ctrl Characters 請檢視:https://streamsets.com/documentation/controlhub/latest/help/pdesigner/datacollector/UserGuide/Pipeline_Design/ControlCharacters.html

六、Development Stages

您可以使用幾個Development stages來幫助開發和測試管道

參考地址:https://streamsets.com/documentation/controlhub/latest/help/pdesigner/datacollector/UserGuide/Pipeline_Design/DevStages.html#concept_czx_ktn_ht

七、配置Test Origin

重要點:配置TestOrigin的時候和原來的Origin配置方式一樣,具體配置步驟請查如下資訊

Configuring a Test Origin

Configure a test origin in the pipeline properties. When using Control Hub, you can also configure a test origin in the pipeline fragment properties.

  1. On the General tab of the pipeline or fragment properties, select the origin type that you want to use.

    You can select any available origin type.

  2. On the Test Origin tab, configure the origin properties.

    Origin properties for the test origin are the same as for real origins, with all properties displaying on a single tab.

    For details about origin configuration, see "Configuring an <origin type> Origin" in the Origins chapter. For example, for help configuring a Directory test origin, see Configuring a Directory Origin.

Using a Test Origin in Data Preview

To use a configured test origin in data preview, configure the preview configuration options.

  1. Click the Data Preview icon to start data preview.
  2. In the Preview Configuration dialog box, set the Preview Source property to Test Origin, then configure the rest of the data preview properties as needed.

    For more information about using data preview, see Data Preview Overview.

八、Understanding Pipeline States

關於Pipeline執行的相關狀態一般是start ,run,stop ,edit等

詳細的請檢視:https://streamsets.com/documentation/controlhub/latest/help/pdesigner/datacollector/UserGuide/Pipeline_Maintenance/PipelineStates-Understanding.html