使用Spark SQL JDBC同步資料至MySQL
阿新 • • 發佈:2020-12-21
Spark SQL JDBC
我們可以使用一個 JDBC 的連結來定義一個 Spark SQL 的表或者檢視,這裡用表來做示例:
我們先在 mysql 中建立一個需要同步的表 test:
CREATE TABLE my.test (
id BIGINT ( 20 ) PRIMARY KEY NOT NULL auto_increment,
create_time TIMESTAMP NOT NULL COMMENT '建立時間',
name VARCHAR ( 127 ) COMMENT '使用者姓名',
age INT COMMENT '使用者年齡'
);
然後在 Spark SQL 中建立一張關聯表關聯 mysql 中的表:
CREATE TABLE IF NOT EXISTS bi.test
USING jdbc
OPTIONS (
url "jdbc:mysql://localhost:3306",
dbtable "my.test",
user "root",
password "123456"
)
當我們使用 Spark SQL 向定義好的表 bi.test 中插入資料,同時也會將資料傳輸到 mysql 的 my.test 表。
insert into bi.test select null, from_unixtime(unix_timestamp()) as create_time, name, age from other_table;
注意,如果你沒有在 mysql 中提前定義好 my.test 表,Spark SQL 會自動幫你建立,不過此時 my.test 表的定義格式會和你預期的不符,即欄位型別可能不是自己想要的型別。故我們提前定義好 my.test 表。
同樣我們也可以使用 overwrite 插入資料:
insert overwrite table bi.test
select null, from_unixtime(unix_timestamp()) as create_time, name, age from othertable;
但是此時會導致提前建好的 my.tes t表被刪除,然後重新建立,這裡的重新建立是 Spark SQL 給你建立的,這裡就會有一個問題,就是上面所說的此時 Spark SQL 自動建立的my.test表的定義格式會和你預期的不符。
那如何解決這種問題呢?我們可以自己清空 bi.test 然後再使用 insert into 插入資料。
truncate table bi.test;insert into bi.test
select null, from_unixtime(unix_timestamp()) as create_time, name, age from othertable;
這樣做除了可以保持表是自己定義的結構,同時還會讓自增的 id 重新從1開始計數。