1. 程式人生 > >sqoop的java操作,總結歸納,含程式碼

sqoop的java操作,總結歸納,含程式碼

(下面說的操作hdfs其實和操作hive意思一樣,都是資料夾)

最近要在專案中加一個sqoop的功能,需求是將hive的資料匯入至mysql,也就是export功能

由於之前沒用過sqoop,所以特地去學習怎麼使用,這裡總結下這兩天瞭解到的簡單內容

首先sqoop有兩個版本,1.4.X和1.99.X,前者俗稱為sqoop1後者成為sqoop2,然後又有apache和cloudera兩種

sqoop1和sqoop2使用方法和命令有較大不同,我先說sqoop1在java中的使用

首先我依賴使用的是這個(我們用的gradle來管理依賴)

compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2')

因為這個包裡面和我之前的有衝突,所以我還排除了下面兩個包(當時報錯資訊是找不到方法,然後一搜那個類專案中有兩個)

compile(group: 'org.apache.sqoop', name: 'sqoop', version: '1.4.6-cdh5.5.2') {
        exclude group: "org.slf4j", module: "slf4j-log4j12"
        exclude group: "org.apache.hadoop", module: "hadoop-core"
    }

sqoop1在java中執行的時候,會使用本地模式去執行mapreduce任務,他會在本地tmp目錄下給你生成java檔案(我的電腦是在tmp下生成了很多java檔案),和你要操作的hdfs叢集上安沒安sqoop一點關係都沒有

因為和hdfs還有hadoop有關,所以你可能還需要這些依賴

hadoop-core 2.6.0-mr1-cdh5.5.2

hadoop-common 2.6.0-cdh5.5.2

hadoop-mapreduce-client-core 2.6.0-cdh5.5.2

我的專案中之前就已經有hadoop云云的依賴了,所以我直接加了sqoop的就可以用了,如果你們要單獨測試,需要先在java中把這些都加上,不然執行的時候會有classNotFound錯誤,注意版本一致性阿

然後就可以寫測試了,直接貼程式碼(hdfs我使用的是我自己本臺機器的叢集,所以地址是我自己電腦的,也沒有認證)

Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.2.14:9000/");//設定HDFS服務地址
String[] arg = new String[] {"--connect","jdbc:mysql://114.115.156.37:3306/test",
                "--username","xxx",
                "--password","xxx",
                "--table","persons",
                "--m","1",
                "--export-dir","hdfs://10.30.88.46:8020/user/hive/warehouse/dw_api_server.db/persons",
                "--input-fields-terminated-by","\t"
        		};
String[] expandArguments = OptionsFileUtil.expandArguments(arg);
        SqoopTool tool = SqoopTool.getTool("export");
        Configuration loadPlugins = SqoopTool.loadPlugins(conf);
        Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool, loadPlugins);

        int res = Sqoop.runSqoop(sqoop, expandArguments);
        if (res == 0) {
            System.out.println ("成功");
        }else {
        	System.out.println("失敗");
        }

192.168.2.14就是我本機器的地址,建議不要寫localhost,因為我寫這個執行不了,儘量寫ip地址,也不要寫主機名,如果hdfs要過kerberos會有額外的操作以及錯誤處理,我一會說

下面的getTool,因為我是hdfs匯出mysql,所以寫的export,如果是反過來的,是import,這個網上命令例子也很多,我就不寫了,這裡只寫匯出

這段程式碼就是將hdfs上的資料匯入到mysql中去的,命令必須使用String陣列形式操作,中間不用加空格,屬性和值記得分開寫,從上之下,mysql連線地址,使用者名稱密碼,操作的table,-m指用幾個機器來執行mr(這個有點記不起來了,可能意思不太對,求糾正),export-dir簡單說就是hdfs上的目錄,下面那個是指按什麼分隔符來插入

幾點問題

persons資料夾下放的是txt檔案,或者csv檔案,內容用\t分割的,說白了就是文字檔案,這樣語句才能執行成功,不然會報錯

文字檔案的欄位列數和mysql一致

要保證訪問hdfs的使用者有許可權操作,mysql使用者也需要有讀寫許可權

hdfs資料插入到mysql中必須型別能夠匹配或轉換,比如hdfs是字串的"aaa",mysql裡面是int,就會報錯

缺點

hdfs的文字檔案列可以和myslq不一致,他會預設依次叢左到右將hdfs的資料取出來,再從左到右的插入到mysql中去

上面的String數組裡還可以加一個屬性叫--columns可以指定要插入那列,這裡指定的列是以mysql為基準的,但取資料依舊是從左到右的從hdfs中拿,所以可能資料會錯位,所以建議結構兩邊一致,然後要取的時候全取,你們操作的時候可以看下,僅僅針對txt這類的文字檔案

hive操作同理,textfile型別表的你就當在操作文字檔案就好了

關於parquet檔案,或者說hive的parquet表

sqoop1是可以解析parquet檔案的,但在我試的過程中,有幾個前提條件

首先需要多加一個依賴

compile group: 'org.kitesdk', name: 'kite-data-mapreduce', version: '1.1.0'

我怎麼知道要加這個,因為在操作parquet檔案的時候他給我報classNotFound,就是這個包

解析parquet檔案或者hive的parquet表時候,上面的命令只需要將

"--input-fields-terminated-by","\t"

刪除即可

解析parquet表,必須在資料的目錄下有.metadata檔案,不然無法解析,hive的parquet表同理,就是資料檔案和.metadata處於一個目錄下

它會根據metadata裡面的規則去解析parquet檔案,metadata裡面定義了資料的

目前我之測試了XXX.parquet檔案可以解析,其他的沒有測試,例如avro,有一種情況不能解析,就是在hive下,表的型別是parquet,但是表裡的資料是從別的表insert過來的,那麼在hdfs上的檔案就變成了0000_0這種樣子,這樣的解析不了,我在嘗試的時候報錯,即使有metadata檔案,不過parquet表可以執行要匯出那幾列,不想txt的會錯位

報錯

我遇到的了下面幾種錯誤

Path is not a file: /user/hive/warehouse/test.db/persons

提示找不到meatadata檔案,如果你匯出的是parquet表,目錄下沒有metadata會報這個錯,同樣沒有資料檔案會報找不到資料檔案

Can not read value at 1 in block 0 in file

hive表是parquet表,而且也有metadata檔案,會有這個錯誤,指不能解析檔案,我這個檔案下是0000_0這種樣子的

classNotFound:java.sun.tools.xxxxxxxx

我第一次執行報了這個錯誤,需要將你jdk檔案中的一個Tools.jar的jar包放到你的專案中去

numberformatexception

出現轉換錯誤都是hive表或者檔案內容和mysql結構不一致,無法自動轉換的問題,改結構去,解析parquet檔案的時候會出現找不到欄位錯誤,可能是因為大小寫的原因

Can't get Master Kerberos principal for use as renewer

hdfs需要過kerberos的時候,命名使用keytab檔案已經登陸成功了,可是一直出這個問題,參考方法是將hadoop叢集上的yarn-site.xml檔案放到你專案的resource中去,然後在程式碼中加入

conf.addResource("yarn-site.xml"); // conf是你的Configuration物件

就沒事了,該問題我是參考這裡的,我把連線貼出來,感謝一下

http://www.kevin517.win/2017/11/21/%E8%BF%9E%E6%8E%A5%E5%B8%A6%E6%9C%89%20Kerberos%20%E8%AE%A4%E8%AF%81%E7%9A%84%20Hadoop%20%20HBase%20Spark/

其他錯誤

如果出現其他不知名的錯誤,請考慮版本是否一致,jar衝突這些問題

--hive和--hcatalog命令

我在找資料的時候有看到這兩個命令,直接是hive-》mysql,但是經過試驗,--hive顯示無此命令,hcatalog命令無效,提示找不到表,也不知道是版本不對還是啥,放棄了暫時

關於打包專案出現的問題

上面的問題我解決了以後,發現打包的時候缺了一個包,具體解決如下

logredactor1.0.3

https://blog.csdn.net/u011856283/article/details/80690031

然後是打包以後執行不料sqoop1的程式,顯示找不到jar,無法識別的符號,特別費解,最後找到了答案,參考網址如下

https://www.cnblogs.com/claren/p/7240735.html

對於這個問題,也可以在上面程式碼中使用如下方式解決(未測試)

SqoopOptions options = sqoop.getOptions();
        options.setHadoopMapRedHome("/xxx");

===================================================================================================

sqoop2

sqoop2是直接使用程式連線到叢集上的sqoop,遠端操作,流程是需要先建立link也可以理解程要操作的物件,比如一個link是hdfs,一個link是mysql,有了link後需要建立job,建立job需要指定那兩個link進行互動,設定from和to的關係,然後執行job就可以了(我覺得sqoop2更方便)

首先依賴和上面sqoop1的不一樣,我們用的是一個sqoop-client的jar,我這裡用的是apache的版本

<dependency>
    <groupId>org.apache.sqoop</groupId>
    <artifactId>sqoop-client</artifactId>
    <version>1.99.7</version>
</dependency>

不需要其他依賴

注意:這個jar的版本請務必和你叢集上安裝的sqoop版本一致,不然會出莫名的錯誤

這裡貼一下官方的demo地址,官方寫的很全面,可以直接參考,我下面的程式也源自這裡改編

http://sqoop.apache.org/docs/1.99.6/ClientAPI.html

/**
	 * 建立連線
	 * @throws Exception
	 */
	public static void ImportTest() throws Exception{
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		// ==============================================
		// create a placeholder for link		
		long connectorId = 1;
		MLink link = client.createLink("hdfs-connector"); 
		// ==============================================================================
		link.setName("HDFS");
		link.setCreationUser("hadoop");
		MLinkConfig linkConfig = link.getConnectorLinkConfig();
		linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://127.0.0.1:9000");
		// ==============================================================================
		Status status = client.saveLink(link);
		if(status.canProceed()) {
		 System.out.println("Created Link with Link Id : " + link.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the link");
		}
	}

這是創造了一個hdfs的link,上面localhost:12000/sqoop指的是你叢集上的sqoop

這裡有個小問題,官方的demo或者是別的地方找到的資料

MLink link = client.createLink("hdfs-connector"); 

這句話的createLink中寫的都是數字,我這裡寫的是字串,這是因為版本問題,最新的版本是不能寫數字的,改成了字串,這裡的hdfs-connector指的是建立hdfs的link,瞎寫會報錯

下面貼一個mysql(通用jdbc)的link

	/**
	 * 建立連線
	 * @throws Exception
	 */
	public static void ImportTest() throws Exception{
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		// ==============================================
		// create a placeholder for link		
		long connectorId = 1;
		MLink link = client.createLink("generic-jdbc-connector");
		// ==============================================================================
		link.setName("mysql_link");
		link.setCreationUser("hahaha");
		MLinkConfig linkConfig = link.getConnectorLinkConfig();
		linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/test");
		linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
		linkConfig.getStringInput("linkConfig.username").setValue("hadoop");
		linkConfig.getStringInput("linkConfig.password").setValue("hadoop");
		// ==============================================================================
		Status status = client.saveLink(link);
		if(status.canProceed()) {
		 System.out.println("Created Link with Link Id : " + link.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the link");
		}
	}
這裡換成了
generic-jdbc-connector

意思 是通用的jdbc,也就是說不止mysql,其他支援jdbc的也可以,雖然我沒試,前提需要你在叢集上的sqoop的lib下加入驅動檔案,不然不能用。

然後就是建立一個任務了

/**
	 * 建立任務
	 */
	public static void saveJob() {
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		//Creating dummy job object
		MJob job = client.createJob("mysql", "HDFS");
		job.setName("myJobs");
		job.setCreationUser("myJobsUser");
		// set the "FROM" link job config values
		MFromConfig fromJobConfig = job.getFromJobConfig();
		fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("test");
		fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("Persons");
		fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("Id_P");
		// set the "TO" link job config values
		MToConfig toJobConfig = job.getToJobConfig();
		toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/sqoop");
		// set the driver config values
		MDriverConfig driverConfig = job.getDriverConfig();
		//driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

		Status status = client.saveJob(job);
		if(status.canProceed()) {
		 System.out.println("Created Job with Job Id: "+ job.getPersistenceId());
		} else {
		 System.out.println("Something went wrong creating the job");
		}
	}

這裡設定from和to的關係

MJob job = client.createJob("mysql", "HDFS");

裡面的值就是你剛剛建立link填寫的name屬性,這裡指mysql到hdfs

執行任務(這段程式碼是我找到的,基本沒有改)

/**
	 * 啟動job
	 */
	public static void startJob() {
		String url = "http://localhost:12000/sqoop/";
		SqoopClient client = new SqoopClient(url);
		MJob job = client.getJob("myJobs");
		//啟動任務
        long jobId = job.getPersistenceId();
        MSubmission submission = client.startJob("myJobs");
        System.out.println("JOB提交狀態為 : " + submission.getStatus());
        while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
          System.out.println("進度 : " + String.format("%.2f %%", submission.getProgress() * 100));
          //三秒報告一次進度
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
        System.out.println("JOB執行結束... ...");
        System.out.println("Hadoop任務ID為 :" + submission.getExternalJobId());
        Counters counters = submission.getCounters();
        if(counters != null) {
          System.out.println("計數器:");
          for(CounterGroup group : counters) {
            System.out.print("\t");
            System.out.println(group.getName());
            for(Counter counter : group) {
              System.out.print("\t\t");
              System.out.print(counter.getName());
              System.out.print(": ");
              System.out.println(counter.getValue());
            }
          }
        }
        if(submission.getError() != null) {
          System.out.println("JOB執行異常,異常資訊為 : " +submission.getError());
        }
        System.out.println("MySQL通過sqoop傳輸資料到HDFS統計執行完畢");
	}
具體引數和安裝過程大家可以去自行了解,java操作大概就是這些了,有錯請指正