1. 程式人生 > >【Sqoop】資料轉換工具Sqoop

【Sqoop】資料轉換工具Sqoop

1、Sqoop概述

HiveQL對資料進行分析,並將結果集儲存到hdfs 檔案或hive 表中,當前端需要使用資料處理的結果時,需要將結果集匯出到RDBMS中,而Sqoop就是將常用的MapReduce(資料匯入匯出)進行封裝,通過傳遞引數的形式,執行MapReduce任務,將hdfs檔案系統、Hive或HBase中的資料匯出到RDBMS,或將RDBMS中的資料匯入到hdfs檔案系統、Hive或HBase中的。sqoop,即SQL to HADOOP的簡寫。以Hadoop 為主體,RDBMS為客體,sqoop import,就是將RDBMS資料放入hadoop 中,就是匯入import;sqoop export,就是將hadoop中的資料放入到RDBMS中,就是匯出export。sqoop 是依賴於hadoop的,需要匯入匯出的資料,儲存在hdfs中,而且底層的資料傳輸的實現使用MapReduce或YARN,Sqoop 底層的實現就是MapReduce,使用批處理方式進行資料傳輸。

2、Sqoop版本

Sqoop1和Sqoop2版本是兩個不同版本,完全不相容。其版本號劃分方式:Apache:1.4.x~ ,1.99.x~。
Sqoop2比Sqoop1的改進有:
(1)引入sqoop server,集中化管理Connector等;
(2)多種訪問方式:CLI,Web UI,REST API;
(3)引入基於角色的安全機制。
一般的Apache官方適應於Hadoop版本的編譯好的sqoop二進位制檔案並不適用於我們的hadoop版本,所以我們需要依據hadoop 版本編譯sqoop。實際生產環境中我們使用CDH(Cloudera Hadoop)版本的Hadoop,裡面有適合我們Hadoop 2.5.0版本的編譯好的sqoop。
CDH 5.3.6 版本非常的穩定和好用,我們使用的是hadoop-2.5.0-cdh5.3.6.tar.gz, hive-0.13.1-cdh5.3.6.tar.gz,zookeeper-3.4.5-cdh5.3.6.tar.gz, sqoop-1.4.5-cdh5.3.6.tar.gz,flume-ng-1.5.0-cdh5.3.6,oozie-4.0.0-cdh5.3.6和hue-3.7.0-cdh5.3.6.tar.gz,下載地址:

http://archive.cloudera.com/cdh5/cdh/5/。

3、Sqoop與RDBMS的結合

Sqoop連線RDBMS的四要素: JDBCurl、username、password、tablename。
RDBMS以MySQL資料庫為例,拷貝jdbc驅動包到SQOOPHOME/libSQOOP_HOME/lib目錄下, ` cp /opt/software/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/lib/`
Sqoop列出MySQL中的資料庫:

bin/sqoop list-databases \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306 \
--username root \
--password 123456

4、Sqoop將RDBMS中的資料匯入到HDFS中

(1)在MySQL中建表my_user,並插入資料:

CREATE TABLE `my_user` (
  `id` tinyint(4) NOT NULL AUTO_INCREMENT,
  `account` varchar(255) DEFAULT NULL,
  `passwd` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

INSERT INTO `my_user` VALUES ('1', 'admin', 'admin');
INSERT INTO `my_user` VALUES ('2', 'pu', '12345');
INSERT INTO `my_user` VALUES ('3', 'system', 'system');
INSERT INTO `my_user` VALUES ('4', 'zxh', 'zxh');
INSERT INTO `my_user` VALUES ('5', 'test', 'test');
INSERT INTO `my_user` VALUES ('6', 'pudong', 'pudong');
INSERT INTO `my_user` VALUES ('7', 'qiqi', 'qiqi');

(2)將MySQL中的my_user表匯入HDFS,不指定HDFS路徑預設匯入到/user/beifeng下,以表的名字 my_user建立的目錄中,預設執行僅有map的MapReduce任務,預設使用的map數為4個:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user

(3)指定匯入的HDFS目錄,指定map數為1,因為我們的測試僅有7條資料:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user \
--num-mappers 1

(4)資料儲存檔案格式:textfile、orcfile、parquet。匯入HDFS,並存儲為parquet格式:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user_parquet \
--fields-terminated-by ',' \
--num-mappers 1 \
--as-parquetfile

(5)匯入HDFS,指定要匯入的列:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_user_column \
--num-mappers 1 \
--columns id,account

(6)將SQL查詢出來的指定內容匯入HDFS:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--query 'select id, account from my_user where $CONDITIONS' \
--target-dir /user/beifeng/sqoop/imp_my_user_query \
--num-mappers 1

(7)匯入HDFS並壓縮為Snappy格式:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_snappy \
--delete-target-dir \
--num-mappers 1 \
--compress \
--compression-codec org.apache.hadoop.io.compress.SnappyCodec \
--fields-terminated-by '\t'

(8)在Hive上建立表,並將匯入到HDFS上的資料插入到Hive的表中:

drop table if exists default.hive_user_snappy ;
create table default.hive_user_snappy(
id int,
username string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ;

load data inpath '/user/beifeng/sqoop/imp_my_snappy' into table default.hive_user_snappy ;

Sqoop在MySQL、HDFS和Hive之間的協作:
(1)在MySQL中建立表;
(2)Sqoop將MySQL中的表import到HDFS中,預設儲存為TEXTFILE格式,可以儲存為Snappy格式;
(3)在Hive中建立表;
(4)將匯入到HDFS中的資料load到Hive表中;
(5)在Hive中進行查詢,可以使用HiveServer2等工具以JDBC方式查詢。

(9)增量資料匯入HDFS:
通常表中欄位有一個唯一識別符號,類似於插入時間createtime,這樣,在查詢語句中可以新增查詢條件:
where createtime => 20150924000000000 and createtime < 20150925000000000

Incremental import arguments:
   --check-column <column>        Source column to check for incremental
                                  change
   --incremental <import-type>    Define an incremental import of type
                                  'append' or 'lastmodified'
   --last-value <value>           Last imported value in the incremental
                                  check column	

增量匯入HDFS:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_incr \
--num-mappers 1 \
--incremental append \
--check-column id \
--last-value 4

(10)使用direct引數將MySQL中的資料直接匯入到HDFS中:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--target-dir /user/beifeng/sqoop/imp_my_incr \
--num-mappers 1 \
--delete-target-dir \
--direct

5、Sqoop將HDFS中的資料匯出到RDBMS中

(1)準備資料,並上傳到HDFS:

touch /opt/datas/user.txt
vi /opt/datas/user.txt
8,beifeng,beifeng
9,xuanyun,xuanyu

bin/hdfs dfs -mkdir -p /user/beifeng/sqoop/exp/user/ 
bin/hdfs dfs -put /opt/datas/user.txt /user/beifeng/sqoop/exp/user/

(2)將HDFS上的資料匯出到RDBMS的表my_user中:

bin/sqoop export \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--export-dir /user/beifeng/sqoop/exp/user/ \
--num-mappers 1

6、匯入匯出Hive表

(1)將MySQL中的表匯入到Hive的表中:
在Hive中建立表:

use default ;
drop table if exists user_hive ;
create table user_hive(
id int,
account string,
password string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;

將MySQL中的my_user表匯入Hive中的user_hive表:

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user \
--fields-terminated-by '\t' \
--delete-target-dir \
--num-mappers 1 \
--hive-import \
--hive-database default \
--hive-table user_hive

(2)將Hive中的表匯出到MySQL中:
在MySQL中建立表:

CREATE TABLE `my_user2` (
  `id` tinyint(4) NOT NULL AUTO_INCREMENT,
  `account` varchar(255) DEFAULT NULL,
  `passwd` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

Hive中的表儲存在HDFS上,將Hive中的表user_hive中資料匯出到MySQL中建立的表my_user2中:

bin/sqoop export \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user2 \
--export-dir /user/hive/warehouse/user_hive \
--num-mappers 1 \
--input-fields-terminated-by '\t'

7、指令碼執行

可以將bin/hive和bin/sqoop的執行語句都寫成指令碼,然後用Shell指令碼來在命令列執行。
bin/sqoop指令碼格式如下:

## sqoop-import-hdfs.txt

import
--connect 
jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test
--username 
root
--password 
123456 
--table 
my_user 
--target-dir 
/user/beifeng/sqoop/imp_my_user_option
--num-mappers 
1

Shell指令碼格式如下:

shell scripts
	## step 1
	load data ...
	## step 2
	bin/hive -f xxxx
	## step 3
	bin/sqoop --options-file /opt/datas/sqoop-import-hdfs.txt