【Sqoop】資料轉換工具Sqoop
1、Sqoop概述
HiveQL對資料進行分析,並將結果集儲存到hdfs 檔案或hive 表中,當前端需要使用資料處理的結果時,需要將結果集匯出到RDBMS中,而Sqoop就是將常用的MapReduce(資料匯入匯出)進行封裝,通過傳遞引數的形式,執行MapReduce任務,將hdfs檔案系統、Hive或HBase中的資料匯出到RDBMS,或將RDBMS中的資料匯入到hdfs檔案系統、Hive或HBase中的。sqoop,即SQL to HADOOP的簡寫。以Hadoop 為主體,RDBMS為客體,sqoop import,就是將RDBMS資料放入hadoop 中,就是匯入import;sqoop export,就是將hadoop中的資料放入到RDBMS中,就是匯出export。sqoop 是依賴於hadoop的,需要匯入匯出的資料,儲存在hdfs中,而且底層的資料傳輸的實現使用MapReduce或YARN,Sqoop 底層的實現就是MapReduce,使用批處理方式進行資料傳輸。
2、Sqoop版本
Sqoop1和Sqoop2版本是兩個不同版本,完全不相容。其版本號劃分方式:Apache:1.4.x~ ,1.99.x~。
Sqoop2比Sqoop1的改進有:
(1)引入sqoop server,集中化管理Connector等;
(2)多種訪問方式:CLI,Web UI,REST API;
(3)引入基於角色的安全機制。
一般的Apache官方適應於Hadoop版本的編譯好的sqoop二進位制檔案並不適用於我們的hadoop版本,所以我們需要依據hadoop 版本編譯sqoop。實際生產環境中我們使用CDH(Cloudera Hadoop)版本的Hadoop,裡面有適合我們Hadoop 2.5.0版本的編譯好的sqoop。
CDH 5.3.6 版本非常的穩定和好用,我們使用的是hadoop-2.5.0-cdh5.3.6.tar.gz, hive-0.13.1-cdh5.3.6.tar.gz,zookeeper-3.4.5-cdh5.3.6.tar.gz, sqoop-1.4.5-cdh5.3.6.tar.gz,flume-ng-1.5.0-cdh5.3.6,oozie-4.0.0-cdh5.3.6和hue-3.7.0-cdh5.3.6.tar.gz,下載地址:
3、Sqoop與RDBMS的結合
Sqoop連線RDBMS的四要素: JDBCurl、username、password、tablename。
RDBMS以MySQL資料庫為例,拷貝jdbc驅動包到 cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/lib/`
Sqoop列出MySQL中的資料庫:
bin/sqoop list-databases \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306 \
--username root \
--password 123456
4、Sqoop將RDBMS中的資料匯入到HDFS中
(1)在MySQL中建表my_user,並插入資料:
CREATE TABLE `my_user` (
`id` tinyint(4) NOT NULL AUTO_INCREMENT,
`account` varchar(255) DEFAULT NULL,
`passwd` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO `my_user` VALUES ('1', 'admin', 'admin');
INSERT INTO `my_user` VALUES ('2', 'pu', '12345');
INSERT INTO `my_user` VALUES ('3', 'system', 'system');
INSERT INTO `my_user` VALUES ('4', 'zxh', 'zxh');
INSERT INTO `my_user` VALUES ('5', 'test', 'test');
INSERT INTO `my_user` VALUES ('6', 'pudong', 'pudong');
INSERT INTO `my_user` VALUES ('7', 'qiqi', 'qiqi');
(2)將MySQL中的my_user表匯入HDFS,不指定HDFS路徑預設匯入到/user/beifeng下,以表的名字 my_user建立的目錄中,預設執行僅有map的MapReduce任務,預設使用的map數為4個:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user
(3)指定匯入的HDFS目錄,指定map數為1,因為我們的測試僅有7條資料:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user \
--num-mappers 1
(4)資料儲存檔案格式:textfile、orcfile、parquet。匯入HDFS,並存儲為parquet格式:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user_parquet \
--fields-terminated-by ',' \
--num-mappers 1 \
--as-parquetfile
(5)匯入HDFS,指定要匯入的列:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user_column \
--num-mappers 1 \
--columns id,account
(6)將SQL查詢出來的指定內容匯入HDFS:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--query 'select id, account from my_user where $CONDITIONS' \
--target-dir /user/beifeng/sqoop/imp_my_user_query \
--num-mappers 1
(7)匯入HDFS並壓縮為Snappy格式:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_snappy \
--delete-target-dir \
--num-mappers 1 \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \
--fields-terminated-by '\t'
(8)在Hive上建立表,並將匯入到HDFS上的資料插入到Hive的表中:
drop table if exists default.hive_user_snappy ;
create table default.hive_user_snappy(
id int,
username string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;
load data inpath '/user/beifeng/sqoop/imp_my_snappy' into table default.hive_user_snappy ;
Sqoop在MySQL、HDFS和Hive之間的協作:
(1)在MySQL中建立表;
(2)Sqoop將MySQL中的表import到HDFS中,預設儲存為TEXTFILE格式,可以儲存為Snappy格式;
(3)在Hive中建立表;
(4)將匯入到HDFS中的資料load到Hive表中;
(5)在Hive中進行查詢,可以使用HiveServer2等工具以JDBC方式查詢。
(9)增量資料匯入HDFS:
通常表中欄位有一個唯一識別符號,類似於插入時間createtime,這樣,在查詢語句中可以新增查詢條件:
where createtime => 20150924000000000 and createtime < 20150925000000000
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
增量匯入HDFS:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_incr \
--num-mappers 1 \
--incremental append \
--check-column id \
--last-value 4
(10)使用direct引數將MySQL中的資料直接匯入到HDFS中:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_incr \
--num-mappers 1 \
--delete-target-dir \
--direct
5、Sqoop將HDFS中的資料匯出到RDBMS中
(1)準備資料,並上傳到HDFS:
touch /opt/datas/user.txt
vi /opt/datas/user.txt
8,beifeng,beifeng
9,xuanyun,xuanyu
bin/hdfs dfs -mkdir -p /user/beifeng/sqoop/exp/user/
bin/hdfs dfs -put /opt/datas/user.txt /user/beifeng/sqoop/exp/user/
(2)將HDFS上的資料匯出到RDBMS的表my_user中:
bin/sqoop export \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--export-dir /user/beifeng/sqoop/exp/user/ \
--num-mappers 1
6、匯入匯出Hive表
(1)將MySQL中的表匯入到Hive的表中:
在Hive中建立表:
use default ;
drop table if exists user_hive ;
create table user_hive(
id int,
account string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;
將MySQL中的my_user表匯入Hive中的user_hive表:
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--fields-terminated-by '\t' \
--delete-target-dir \
--num-mappers 1 \
--hive-import \
--hive-database default \
--hive-table user_hive
(2)將Hive中的表匯出到MySQL中:
在MySQL中建立表:
CREATE TABLE `my_user2` (
`id` tinyint(4) NOT NULL AUTO_INCREMENT,
`account` varchar(255) DEFAULT NULL,
`passwd` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Hive中的表儲存在HDFS上,將Hive中的表user_hive中資料匯出到MySQL中建立的表my_user2中:
bin/sqoop export \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user2 \
--export-dir /user/hive/warehouse/user_hive \
--num-mappers 1 \
--input-fields-terminated-by '\t'
7、指令碼執行
可以將bin/hive和bin/sqoop的執行語句都寫成指令碼,然後用Shell指令碼來在命令列執行。
bin/sqoop指令碼格式如下:
## sqoop-import-hdfs.txt
import
--connect
jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test
--username
root
--password
123456
--table
my_user
--target-dir
/user/beifeng/sqoop/imp_my_user_option
--num-mappers
1
Shell指令碼格式如下:
shell scripts
## step 1
load data ...
## step 2
bin/hive -f xxxx
## step 3
bin/sqoop --options-file /opt/datas/sqoop-import-hdfs.txt