1. 程式人生 > >使用C#和.NET Core的ETL作業

使用C#和.NET Core的ETL作業

目錄

介紹

ETLBox元件

概述控制流任務

ADO.NET——舊方式

使用控制流任務的行計數

為什麼不是Entitiy Framework

概述資料流

源元件

轉換

緩衝

非阻塞和阻塞轉換

目標元件

一個簡單的資料流

示例

環境

開始編碼

新增nlog.config

執行專案

一個簡單的etl管道

再次執行

完整程式碼


ETLBox允許您使用C#編寫ETL作業,並且是SSIS的真正替代品。由於它基於.NET core,您甚至可以在任何平臺上執行它。

介紹

此示例將概述ETLBox的基本概念。它向您展示瞭如何使用純

C#程式碼編寫自己的ETL作業。示例非常簡單——它向您介紹了框架的基本概念。

示例程式碼將在.NET核心上執行,並符合.NET Standard 2.0。您還需要啟動並執行SQL Server。最新版本的ETLBox可以通過nuget下載。

如果您正在尋找更多示例和詳細資訊,請參閱專案主頁——https://etlbox.net

ETLBox元件

ETLBox分為兩個主要元件:控制流任務資料流任務。控制流部分中的某些任務僅用於記錄目的。本文將向您概述兩者,然後向您展示如何編寫簡單的ETL作業。

概述控制流任務

控制流任務可以拆分為“ 常規 ”任務和“ 

記錄 ”任務。控制流任務駐留在ALE.ETLBox.ControlFlow名稱空間中——記錄任務駐留在ALE.ETLBox.Logging名稱空間中。

控制流任務是一組全面的任務,用於管理、更改或查詢資料庫。只需一行程式碼,您就可以在資料庫中建立表或觸發SQL。如果您在使用ADO.NET之前曾經這樣做過,那麼您可能會發現有一些樣板程式碼需要一遍又一遍地編寫。控制流任務背後的想法是你不必一次又一次地編寫相同的程式碼,例如,只是為了做一些微不足道的事情,比如開啟連線和計算表中的行。這應該只用一行程式碼就可以了。

ADO.NET——舊方式

例如,在具有經典ADO.NET連線的表上建立連線和執行簡單行計數的程式碼如下所示:

string connectionString = "Data Source=.; Database=Sample; Integrated Security=SSPI";
using (SqlConnection con = new SqlConnection(connectionString))
{
   SqlCommand cmd = new SqlCommand("select count(*) from dbo.tbl", con);
   con.Open();
   int numrows = (int)cmd.ExecuteScalar();   
}

使用控制流任務的行計數

現在讓我們看一下如何使用控制流任務(Control Flow Tasks)庫進行行計數。

首先,我們需要與控制流任務(Control Flow Tasks)建立連線。您只需設定一次資料庫連線字串,如下所示:

ControlFlow.CurrentDbConnection = 
new SqlConnectionManager(new ConnectionString
       ("Data Source=.; Database=Sample; Integrated Security=SSPI""));

如果在執行任務時沒有傳遞其他連線,則連線將儲存在靜態屬性中並由所有後續任務使用。

現在,您可以使用 RowCountTask來查詢表中的行數,僅使用一行程式碼。

int count = RowCountTask.Count("dbo.tbl");

在內部,開啟ADO.NET連線(使用預設的ADO.NET連線池),並且select count(*) from dbo.tbl語句在資料庫上執行語句。結果將從RowCountTask返回。

為什麼不是Entitiy Framework

ETLBox旨在用作ETL物件庫。因此,使用者通常處理大資料,某種資料倉庫結構,並用於完全控制資料庫。藉助ADL.NET的基礎功能——ETLBox使用它——您可以完全訪問資料庫,並且基本上可以執行您在伺服器上使用SQL所做的任何事情。由於EF(實體框架)是一種高度複雜的ORM工具,它的缺點是您只能在EF允許您執行的資料庫上執行操作。但由於EF沒有包含SQL Server上的SQLADO.NET的所有可能性,因此實體框架通常不是建立ETL作業的好選擇。對於其他ORM工具也是如此。

概述資料流

你有一些資料——儲存在一些檔案,一個表或其他地方。現在,您要定義一個獲取此資料的管道,將其動態轉換並將其寫入目標(這可能又是資料庫,檔案或其他位置)。在抽象層面上,這可以看作是ETL過程(ETL = ExtractTransformLoad)。資料流元件將允許您定義自己的ETL作業。所有資料流任務都駐留在ALE.ETLBox.DataFlow名稱空間中。

源元件

所有資料流管道至少需要一個或多個源。源基本上是可以從某個地方讀取資料的所有內容(例如,CSV檔案或資料庫表),然後將這些資料釋出到管道中。所有源都應該能夠非同步讀取資料。這意味著,當元件從源讀取資料時,它同時將已處理的資料傳送到連線到源的元件。目前有兩個內建資料來源:CSVSourceDBSource。如果您需要其他源元件,可以擴充套件CustomSource.

一旦源開始讀取資料,它將開始向其連線的元件傳送資料。這些元件可以是轉換或目標。

轉換

轉換始終至少有一個輸入和一個輸出。輸入可以連線到其他轉換或源,輸出也可以連線到其他轉換或目標。轉換元件的目的是從其輸入中獲取資料並將轉換後的資料釋出到其輸出。這是逐行完成的。只要輸入中有任何資料,轉換就會開始並將結果釋出到輸出。

緩衝

每次轉型都會有一個輸入。如果連線到輸入釋出資料的元件的處理速度比轉換更快,那麼緩衝區將儲存此資料,直到轉換可以繼續進行下一項。這允許源儘可能快地讀取,允許已讀取的資料緩衝在記憶體中——因此轉換將始終準備好處理一些資料。

非阻塞和阻塞轉換

轉換可以是阻塞也可以是非阻塞。

一旦在輸入緩衝區中找到某些內容,非阻塞轉換就會開始處理資料。在它發現數據的那一刻,它將開始轉換它並將資料傳送到已註冊的輸出元件。

阻塞轉換將停止整個管道的資料處理——輸入緩衝區將一直等到所有資料都到達輸入。這意味著它將等到連線到轉換的管道中的所有源都已從其源讀取所有資料,並且所有轉換在處理傳入資料之前。當從連線的源讀取所有資料並在管道下進行轉換時,阻塞轉換將開始轉換。因此,在阻塞轉換的轉換中,您將可以訪問記憶體中緩衝的所有資料。例如,排序元件是阻塞轉換。它將等待所有資料到達轉換塊——然後它將對其進行排序並將已排序的資料釋出到其輸出。

目標元件

目標元件通常只有一個輸入。它們定義資料的目標,例如資料庫表或CSV檔案。目前,有DBDestinationCSVDestination已經實現。如果您需要另一個目標元件,您可以擴充套件CustomDestinationgithub提出一個問題

每個Destination都帶有一個輸入緩衝區。

雖然CSV目標的Destination將開啟一個檔案流,一旦資料到達就會將資料寫入其中,而DB目標將逐批執行此操作——因此,它將等到輸入緩衝區達到批處理大小(或資料是最後一批),然後使用批量插入將其插入資料庫。

一個簡單的資料流

讓我們看一下像這樣的簡單資料流:

CSV檔案(源)->行轉換->DB目標

由於資料流任務基於與控制流任務相同的基礎,因此您首先應該像控制流任務一樣設定連線。

ControlFlow.CurrentDbConnection = 
new SqlConnectionManager(new ConnectionString("Data Source=.;Integrated Security=SSPI;"));

現在我們需要建立一個源,在這個例子中,它可以包含訂單資料。如下所示:

CSVSource sourceOrderData = new CSVSource("demodata.csv");

我們現在新增一個行轉換。CSVSource的預設輸出格式是string陣列。在此示例中,我們將CSV string陣列轉換為Order物件。

RowTransformation<string[], Order> rowTrans = new RowTransformation<string[], Order>(
  row => new Order(row)
);    

現在我們需要建立一個目標。請注意,目標是使用Order物件鍵入的。

DBDestination<Order> dest = new DBDestination<Order>("dbo.OrderTable");

到目前為止,我們只建立了元件,但是我們沒有定義資料流管道。我們現在就這樣做:

sourceOrderData.LinkTo(rowTrans);
rowTrans.LinkTo(dest);

這將建立一個數據流管道CSVSource-> RowTransformation->DBDestination

現在我們將給源命令開始讀取資料。

source.Execute();

此程式碼將作為非同步任務執行。如果要等待資料流管道完成,請將此行新增到程式碼中:

dest.Wait();

dest.Wait()返回時,所有的資料從源中讀取和寫入資料庫表。

示例

現在讓我們一起使用所有控制流任務和資料流元件。在此示例中,我們要建立資料庫和表,並將輸入檔案中的一些資料載入到表中。此外,在載入資料時,我們將進行非常簡單的轉換。

環境

對於此演示,您可以使用Visual Studio for Mac和執行在docker映象中的SQL Server for Linux。用於在Mac上管理SQL Server的使用者介面將是Azure Data Studio

首先,我們需要在ubuntu上啟動執行SQL Serverdocker映象。在終端中執行以下命令列語句以啟動容器。

docker run -d --name sql_server_demo -e 'ACCEPT_EULA=Y' 
-e 'SA_PASSWORD=reallyStrongPwd123' -p  1433:1433 microsoft/mssql-server-linux

使用該命令docker ps,我們可以看到容器已啟動並正在執行。

現在建立一個新的dotnet核心控制檯應用程式。使用GUI執行此操作或執行以下命令:

dotnet new console

將當前版本的ETLBox作為包新增到專案中。

dotnet add package ETLBox

現在,您將能夠使用ETLBox附帶的全套工具。

開始編碼

現在進入static main方法。出於演示目的,將直接在此處新增以下程式碼。在現實生活中,您可能會建立一些物件。

首先,我們需要在static控制流物件中儲存連線字串。

ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
           ("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123"));

有了CreateDatabaseTask,我們將建立一個新的資料庫。

CreateDatabaseTask.Create("demo");

此外,我們希望更改與剛剛建立的資料庫的連線,並使用CreateTableTask在其中建立一個表。

ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123;Initial Catalog=demo"));

            CreateTableTask.Create("dbo.table1", new List<TableColumn>()
            {
                new TableColumn("ID","int",allowNulls:false, isPrimaryKey:true, isIdentity:true),
                new TableColumn("Col1","nvarchar(100)",allowNulls:true),
                new TableColumn("Col2","smallint",allowNulls:true)
            });

新增nlog.config

在我們測試我們的演示專案之前,我們希望顯示一些日誌輸出。ETLBox日誌記錄構建在nlog上。在etlbox網站上,您將找到如何使用nlog配置日誌記錄的示例。將以下行作為nlog.config新增到專案根目錄。確保將其複製到輸出目錄中。

<?xml version="1.0" encoding="utf-8"?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"

      xsi:schemaLocation="NLog NLog.xsd"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
  <rules>
    <logger name="*" minlevel="Debug" writeTo="console" />
  </rules>
  <targets>
    <target name="console" xsi:type="Console" />     
  </targets>
</nlog>

執行專案

現在構建並執行專案。將彈出一個終端視窗並顯示日誌輸出。當日志記錄級別設定為debug時,您將看到針對資料庫執行的所有SQL程式碼。檢查資料庫和表是否已建立。

一個簡單的etl管道

接下來,我們要建立一個簡單的etl管道。首先,我們建立一個名為input.csv的演示CSV檔案。輸入檔案包含標題資訊和一些值。此外,我們需要將其複製到輸出目錄中。

Col1,Col2
Value,1
Value2,2
Value3,3

現在,我們建立一個CSVSource指向新建立的輸入檔案。

CSVSource source = new CSVSource("input.csv");

在繼續之前,我們需要一個可以儲存資料的物件。我們稱之為MyData

public class MyData
{
    public string Col1 { get; set; }
    public string Col2 { get; set; }
}

現在我們新增一個行轉換。行轉換將從源接收一個string陣列並將其轉換為Mydata物件。

RowTransformation<string[], MyData> row = new RowTransformation<string[], MyData>
(
    input => new MyData() 
    { Col1 = input[0], Col2 = input[1] }
);

接下來,我們新增一個指向我們表的資料庫目標。

DBDestination<MyData> dest = new DBDestination<MyData>("dbo.table1");

現在,我們需要連結我們的dataflow元件。

source.LinkTo(row);
row.LinkTo(dest);

連結元件後,我們希望源讀取輸入資料。目的地應該等到它收到所有資料。

source.Execute();
dest.Wait();

最後,我們檢查資料是否已成功載入到表中並將其寫入控制檯輸出。未完成這個我們將使用SQLTask

SqlTask.ExecuteReader("Read all data from table1",
    "select Col1, Col2 from dbo.table1",
    col1 => Console.WriteLine(col1.ToString() + ","),
    col2 => Console.WriteLine(col2.ToString())
);

再次執行

讓我們再次執行專案並檢視輸出。

您將看到資料已成功複製到資料庫表中。

完整程式碼

這是整個示例程式碼。

Program.cs檔案:

using System;
using System.Collections.Generic;
using ALE.ETLBox;
using ALE.ETLBox.ConnectionManager;
using ALE.ETLBox.ControlFlow;
using ALE.ETLBox.DataFlow;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
            ("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123"));
            CreateDatabaseTask.Create("demo");
            ControlFlow.CurrentDbConnection = new SqlConnectionManager
            (new ConnectionString("Data Source=.;Integrated Security=false;
            User=sa;password=reallyStrongPwd123;Initial Catalog=demo"));

            CreateTableTask.Create("dbo.table1", new List<TableColumn>()
            {
                new TableColumn("ID","int",allowNulls:false, isPrimaryKey:true, isIdentity:true),
                new TableColumn("Col1","nvarchar(100)",allowNulls:true),
                new TableColumn("Col2","smallint",allowNulls:true)
            });

            CSVSource source = new CSVSource("input.csv");
            RowTransformation<string[], MyData> row = new RowTransformation<string[], MyData>(
            input => new MyData() { Col1 = input[0], Col2 = input[1] });
            DBDestination<MyData> dest = new DBDestination<MyData>("dbo.table1");

            source.LinkTo(row);
            row.LinkTo(dest);
            source.Execute();
            dest.Wait();

            SqlTask.ExecuteReader("Read all data from table1",
            "select Col1, Col2 from dbo.table1",
                col1 => Console.WriteLine(col1.ToString() + ","),
                col2 => Console.WriteLine(col2.ToString()));
        }

        public class MyData
        {
            public string Col1 { get; set; }
            public string Col2 { get; set; }
        }
    }
}

檔案nlog.config (複製到輸出目錄):

<?xml version="1.0" encoding="utf-8"?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"

      xsi:schemaLocation="NLog NLog.xsd"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
  <rules>
    <logger name="*" minlevel="Debug" writeTo="console" />
  </rules>
  <targets>
    <target name="console" xsi:type="Console" />     
  </targets>
</nlog>

檔案input.csv(複製到輸出目錄):

Col1,Col2
Value,1
Value2,2
Value3,3

 

原文地址:https://www.codeproject.com/Articles/1273231/ETL-Jobs-with-Csharp-NET-Core