使用GO語言通過Stream Load實現Doris資料匯入
阿新 • • 發佈:2022-04-12
Doris github地址歡迎加Star
apache/incubator-doris: Apache Doris(Incubating) is an MPP-based interactive SQL data warehousing for reporting and analysis. (github.com)github.com/apache/incubator-doris本文使用的GO是1.17.2
Doris 0.15.0 release版
Doris的資料匯入有各種語言的版本,但是GO語言版本的基本見不到,簡單學了一下,寫了一個簡單的Stream Load入庫的示例,僅供參考
示例中使用的表結構:
CREATE TABLE IF NOT EXISTS user_info
(
user_id LARGEINT NOT NULL COMMENT "使用者id",
username varchar(50) NOT NULL COMMENT "使用者名稱",
city VARCHAR(20) COMMENT "使用者所在城市",
age SMALLINT COMMENT "使用者年齡",
sex TINYINT COMMENT "使用者性別",
phone LARGEINT COMMENT "電話",
address VARCHAR(500) COMMENT "地址",
register_time datetime COMMENT "使用者註冊時間"
)
Unique KEY(user_id, username)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "3"
);
下面是GO的示例程式碼,其中支援從檔案匯入,從記憶體資料匯入,同時提供了獲取BE節點列表的方法,你在匯入的時候可以從這裡隨機獲取一個BE節點IP及埠,直連BE進行匯入
package main
import (
"container/list"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/gofrs/uuid"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
)
type StreamLoad struct {
url string
dbName string
tableName string
data string
userName string
password string
}
//實現Doris使用者認證資訊
func auth(load StreamLoad) string {
s := load.userName + ":" + load.password
b := []byte(s)
sEnc := base64.StdEncoding.EncodeToString(b)
fmt.Printf("enc=[%s]\n", sEnc)
sDec, err := base64.StdEncoding.DecodeString(sEnc)
if err != nil {
fmt.Printf("base64 decode failure, error=[%v]\n", err)
} else {
fmt.Printf("dec=[%s]\n", sDec)
}
return sEnc
}
//使用Stream load將檔案資料匯入到Doris對應的資料表中
func batch_load_file(load StreamLoad, file string) {
client := &http.Client{}
//生成要訪問的url
url := "http://10.220.146.10:8030/api/test_2/user_info/_stream_load"
//fmt.Formatter(.Format(url,load.dbName,l))
fileContext, err := ioutil.ReadFile(file)
if err != nil {
log.Println("Failed to Read the File", file, err)
}
record := strings.NewReader(string(fileContext))
//提交請求
reqest, err := http.NewRequest(http.MethodPut, url, record)
//增加header選項
reqest.Header.Add("Authorization", "basic "+auth(load))
reqest.Header.Add("EXPECT", "100-continue")
var u1 = uuid.Must(uuid.NewV4())
reqest.Header.Add