sqoop的java操作,總結歸納,含程式碼
(下面說的操作hdfs其實和操作hive意思一樣,都是資料夾)
最近要在專案中加一個sqoop的功能,需求是將hive的資料匯入至mysql,也就是export功能
由於之前沒用過sqoop,所以特地去學習怎麼使用,這裡總結下這兩天瞭解到的簡單內容
首先sqoop有兩個版本,1.4.X和1.99.X,前者俗稱為sqoop1後者成為sqoop2,然後又有apache和cloudera兩種
sqoop1和sqoop2使用方法和命令有較大不同,我先說sqoop1在java中的使用
首先我依賴使用的是這個(我們用的gradle來管理依賴)
compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2')
因為這個包裡面和我之前的有衝突,所以我還排除了下面兩個包(當時報錯資訊是找不到方法,然後一搜那個類專案中有兩個)
compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2') {
exclude group: "org.slf4j", module: "slf4j-log4j12"
exclude group: "org.apache.hadoop", module: "hadoop-core"
}
sqoop1在java中執行的時候,會使用本地模式去執行mapreduce任務,他會在本地tmp目錄下給你生成java檔案(我的電腦是在tmp下生成了很多java檔案),和你要操作的hdfs叢集上安沒安sqoop一點關係都沒有
因為和hdfs還有hadoop有關,所以你可能還需要這些依賴
hadoop-core 2.6.0-mr1-cdh5.5.2
hadoop-common 2.6.0-cdh5.5.2
hadoop-mapreduce-client-core 2.6.0-cdh5.5.2
我的專案中之前就已經有hadoop云云的依賴了,所以我直接加了sqoop的就可以用了,如果你們要單獨測試,需要先在java中把這些都加上,不然執行的時候會有classNotFound錯誤,注意版本一致性阿
然後就可以寫測試了,直接貼程式碼(hdfs我使用的是我自己本臺機器的叢集,所以地址是我自己電腦的,也沒有認證)
Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.2.14:9000/");//設定HDFS服務地址 String[] arg = new String[] {"--connect","jdbc:mysql://114.115.156.37:3306/test", "--username","xxx", "--password","xxx", "--table","persons", "--m","1", "--export-dir","hdfs://10.30.88.46:8020/user/hive/warehouse/dw_api_server.db/persons", "--input-fields-terminated-by","\t" }; String[] expandArguments = OptionsFileUtil.expandArguments(arg); SqoopTool tool = SqoopTool.getTool("export"); Configuration loadPlugins = SqoopTool.loadPlugins(conf); Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins); int res = Sqoop.runSqoop(sqoop, expandArguments); if (res == 0) { System.out.println ("成功"); }else { System.out.println("失敗"); }
192.168.2.14就是我本機器的地址,建議不要寫localhost,因為我寫這個執行不了,儘量寫ip地址,也不要寫主機名,如果hdfs要過kerberos會有額外的操作以及錯誤處理,我一會說
下面的getTool,因為我是hdfs匯出mysql,所以寫的export,如果是反過來的,是import,這個網上命令例子也很多,我就不寫了,這裡只寫匯出
這段程式碼就是將hdfs上的資料匯入到mysql中去的,命令必須使用String陣列形式操作,中間不用加空格,屬性和值記得分開寫,從上之下,mysql連線地址,使用者名稱密碼,操作的table,-m指用幾個機器來執行mr(這個有點記不起來了,可能意思不太對,求糾正),export-dir簡單說就是hdfs上的目錄,下面那個是指按什麼分隔符來插入
幾點問題
persons資料夾下放的是txt檔案,或者csv檔案,內容用\t分割的,說白了就是文字檔案,這樣語句才能執行成功,不然會報錯
文字檔案的欄位列數和mysql一致
要保證訪問hdfs的使用者有許可權操作,mysql使用者也需要有讀寫許可權
hdfs資料插入到mysql中必須型別能夠匹配或轉換,比如hdfs是字串的"aaa",mysql裡面是int,就會報錯
缺點
hdfs的文字檔案列可以和myslq不一致,他會預設依次叢左到右將hdfs的資料取出來,再從左到右的插入到mysql中去
上面的String數組裡還可以加一個屬性叫--columns可以指定要插入那列,這裡指定的列是以mysql為基準的,但取資料依舊是從左到右的從hdfs中拿,所以可能資料會錯位,所以建議結構兩邊一致,然後要取的時候全取,你們操作的時候可以看下,僅僅針對txt這類的文字檔案
hive操作同理,textfile型別表的你就當在操作文字檔案就好了
關於parquet檔案,或者說hive的parquet表
sqoop1是可以解析parquet檔案的,但在我試的過程中,有幾個前提條件
首先需要多加一個依賴
compile group: 'org.kitesdk', name: 'kite-data-mapreduce', version: '1.1.0'
我怎麼知道要加這個,因為在操作parquet檔案的時候他給我報classNotFound,就是這個包
解析parquet檔案或者hive的parquet表時候,上面的命令只需要將
"--input-fields-terminated-by","\t"
刪除即可
解析parquet表,必須在資料的目錄下有.metadata檔案,不然無法解析,hive的parquet表同理,就是資料檔案和.metadata處於一個目錄下
它會根據metadata裡面的規則去解析parquet檔案,metadata裡面定義了資料的
目前我之測試了XXX.parquet檔案可以解析,其他的沒有測試,例如avro,有一種情況不能解析,就是在hive下,表的型別是parquet,但是表裡的資料是從別的表insert過來的,那麼在hdfs上的檔案就變成了0000_0這種樣子,這樣的解析不了,我在嘗試的時候報錯,即使有metadata檔案,不過parquet表可以執行要匯出那幾列,不想txt的會錯位
報錯
我遇到的了下面幾種錯誤
Path is not a file: /user/hive/warehouse/test.db/persons
提示找不到meatadata檔案,如果你匯出的是parquet表,目錄下沒有metadata會報這個錯,同樣沒有資料檔案會報找不到資料檔案
Can not read value at 1 in block 0 in file
hive表是parquet表,而且也有metadata檔案,會有這個錯誤,指不能解析檔案,我這個檔案下是0000_0這種樣子的
classNotFound:java.sun.tools.xxxxxxxx
我第一次執行報了這個錯誤,需要將你jdk檔案中的一個Tools.jar的jar包放到你的專案中去
numberformatexception
出現轉換錯誤都是hive表或者檔案內容和mysql結構不一致,無法自動轉換的問題,改結構去,解析parquet檔案的時候會出現找不到欄位錯誤,可能是因為大小寫的原因
Can't get Master Kerberos principal for use as renewer
hdfs需要過kerberos的時候,命名使用keytab檔案已經登陸成功了,可是一直出這個問題,參考方法是將hadoop叢集上的yarn-site.xml檔案放到你專案的resource中去,然後在程式碼中加入
conf.addResource("yarn-site.xml"); // conf是你的Configuration物件
就沒事了,該問題我是參考這裡的,我把連線貼出來,感謝一下
http://www.kevin517.win/2017/11/21/%E8%BF%9E%E6%8E%A5%E5%B8%A6%E6%9C%89%20Kerberos%20%E8%AE%A4%E8%AF%81%E7%9A%84%20Hadoop%20%20HBase%20Spark/
其他錯誤
如果出現其他不知名的錯誤,請考慮版本是否一致,jar衝突這些問題
--hive和--hcatalog命令
我在找資料的時候有看到這兩個命令,直接是hive-》mysql,但是經過試驗,--hive顯示無此命令,hcatalog命令無效,提示找不到表,也不知道是版本不對還是啥,放棄了暫時
關於打包專案出現的問題
上面的問題我解決了以後,發現打包的時候缺了一個包,具體解決如下
logredactor1.0.3
https://blog.csdn.net/u011856283/article/details/80690031
然後是打包以後執行不料sqoop1的程式,顯示找不到jar,無法識別的符號,特別費解,最後找到了答案,參考網址如下
https://www.cnblogs.com/claren/p/7240735.html
對於這個問題,也可以在上面程式碼中使用如下方式解決(未測試)
SqoopOptions options = sqoop.getOptions();
options.setHadoopMapRedHome("/xxx");
===================================================================================================
sqoop2
sqoop2是直接使用程式連線到叢集上的sqoop,遠端操作,流程是需要先建立link也可以理解程要操作的物件,比如一個link是hdfs,一個link是mysql,有了link後需要建立job,建立job需要指定那兩個link進行互動,設定from和to的關係,然後執行job就可以了(我覺得sqoop2更方便)
首先依賴和上面sqoop1的不一樣,我們用的是一個sqoop-client的jar,我這裡用的是apache的版本
<dependency>
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-client</artifactId>
<version>1.99.7</version>
</dependency>
不需要其他依賴
注意:這個jar的版本請務必和你叢集上安裝的sqoop版本一致,不然會出莫名的錯誤
這裡貼一下官方的demo地址,官方寫的很全面,可以直接參考,我下面的程式也源自這裡改編
http://sqoop.apache.org/docs/1.99.6/ClientAPI.html
/**
* 建立連線
* @throws Exception
*/
public static void ImportTest() throws Exception{
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
// ==============================================
// create a placeholder for link
long connectorId = 1;
MLink link = client.createLink("hdfs-connector");
// ==============================================================================
link.setName("HDFS");
link.setCreationUser("hadoop");
MLinkConfig linkConfig = link.getConnectorLinkConfig();
linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://127.0.0.1:9000");
// ==============================================================================
Status status = client.saveLink(link);
if(status.canProceed()) {
System.out.println("Created Link with Link Id : " + link.getPersistenceId());
} else {
System.out.println("Something went wrong creating the link");
}
}
這是創造了一個hdfs的link,上面localhost:12000/sqoop指的是你叢集上的sqoop
這裡有個小問題,官方的demo或者是別的地方找到的資料
MLink link = client.createLink("hdfs-connector");
這句話的createLink中寫的都是數字,我這裡寫的是字串,這是因為版本問題,最新的版本是不能寫數字的,改成了字串,這裡的hdfs-connector指的是建立hdfs的link,瞎寫會報錯
下面貼一個mysql(通用jdbc)的link
/**
* 建立連線
* @throws Exception
*/
public static void ImportTest() throws Exception{
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
// ==============================================
// create a placeholder for link
long connectorId = 1;
MLink link = client.createLink("generic-jdbc-connector");
// ==============================================================================
link.setName("mysql_link");
link.setCreationUser("hahaha");
MLinkConfig linkConfig = link.getConnectorLinkConfig();
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/test");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("hadoop");
linkConfig.getStringInput("linkConfig.password").setValue("hadoop");
// ==============================================================================
Status status = client.saveLink(link);
if(status.canProceed()) {
System.out.println("Created Link with Link Id : " + link.getPersistenceId());
} else {
System.out.println("Something went wrong creating the link");
}
}
這裡換成了generic-jdbc-connector
意思 是通用的jdbc,也就是說不止mysql,其他支援jdbc的也可以,雖然我沒試,前提需要你在叢集上的sqoop的lib下加入驅動檔案,不然不能用。
然後就是建立一個任務了
/**
* 建立任務
*/
public static void saveJob() {
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Creating dummy job object
MJob job = client.createJob("mysql", "HDFS");
job.setName("myJobs");
job.setCreationUser("myJobsUser");
// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("test");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("Persons");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("Id_P");
// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop");
// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
//driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Id: "+ job.getPersistenceId());
} else {
System.out.println("Something went wrong creating the job");
}
}
這裡設定from和to的關係
MJob job = client.createJob("mysql", "HDFS");
裡面的值就是你剛剛建立link填寫的name屬性,這裡指mysql到hdfs
執行任務(這段程式碼是我找到的,基本沒有改)
/**
* 啟動job
*/
public static void startJob() {
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
MJob job = client.getJob("myJobs");
//啟動任務
long jobId = job.getPersistenceId();
MSubmission submission = client.startJob("myJobs");
System.out.println("JOB提交狀態為 : " + submission.getStatus());
while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("進度 : " + String.format("%.2f %%", submission.getProgress() * 100));
//三秒報告一次進度
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("JOB執行結束... ...");
System.out.println("Hadoop任務ID為 :" + submission.getExternalJobId());
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("計數器:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getError() != null) {
System.out.println("JOB執行異常,異常資訊為 : " +submission.getError());
}
System.out.println("MySQL通過sqoop傳輸資料到HDFS統計執行完畢");
}
具體引數和安裝過程大家可以去自行了解,java操作大概就是這些了,有錯請指正