1. 程式人生 > 其它 >hdfs的Java API操作

hdfs的Java API操作

hdfs的Java API操作,包括使用FileSystem物件的方法操作hdfs、使用流物件操作hdfs。

1 環境準備

將在Windows環境中編譯過的hadoop jar包解壓縮到非中文路徑;

設定環境變數:

開啟IDEA,建立一個Maven工程;

在pom.xml檔案中新增如下依賴,點選右上角的Load Maven Changes開始下載依賴;

 1     <dependencies>
 2         <dependency>
 3             <groupId>junit</groupId>
 4             <artifactId>junit</artifactId>
 5             <
version>RELEASE</version> 6 </dependency> 7 <dependency> 8 <groupId>org.apache.logging.log4j</groupId> 9 <artifactId>log4j-core</artifactId> 10 <version>2.8.2</version> 11 </dependency
> 12 <dependency> 13 <groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-common</artifactId> 15 <version>2.7.2</version> 16 </dependency> 17 <dependency> 18 <groupId
>org.apache.hadoop</groupId> 19 <artifactId>hadoop-client</artifactId> 20 <version>2.7.2</version> 21 </dependency> 22 <dependency> 23 <groupId>org.apache.hadoop</groupId> 24 <artifactId>hadoop-hdfs</artifactId> 25 <version>2.7.2</version> 26 </dependency> 27 <dependency> 28 <groupId>jdk.tools</groupId> 29 <artifactId>jdk.tools</artifactId> 30 <version>1.8</version> 31 <scope>system</scope> 32 <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> 33 </dependency> 34 </dependencies>

在專案的resources目錄下,新建一個file檔案,命名為"log4j.properties",並新增如下內容;

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

2 使用FileSystem物件的方法操作hdfs

    @Test
    public void testHdfsJavaAPI() throws URISyntaxException, IOException, InterruptedException {
        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

        // 2 建立檔案目錄
        fs.mkdirs(new Path("/hdfsPath"));

        // 3 上傳檔案
        fs.copyFromLocalFile(new Path("D:/localFile.txt"), new Path("/hdfsPath/hdfsFile.txt"));

        // 4 下載檔案
        fs.copyToLocalFile(new Path("/hdfsPath/hdfsFile.txt"), new Path("D:/localFile2.txt"));

        //  5 刪除檔案
        fs.delete(new Path("/hdfsPath"), true);

        // 6 關閉資源
        fs.close();
    }
View Code

客戶端與hdfs檔案系統的連線,首先需要一個FileSystem物件,它起到“橋樑”的作用。FileSystem類有一個靜態的get()方法,用於返回FileSystem物件。get()方法共有3個引數,分別是:hdfs中NameNode地址、Configuration物件、訪問的使用者名稱。其中,FileSystem在org.apache.hadoop.fs包下,Configuration在org.apache.hadoop.conf包下,URI在Java.net包下。new URI()會丟擲異常URISyntaxException,FileSystem.get()會丟擲異常IOException、InterruptedException。

        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

FileSystem物件的mkdirs(hdfsFilePath)方法用於建立hdfs檔案目錄。其中,Path類在org.apache.hadoop.fs包下。

        // 2 建立檔案目錄
        fs.mkdirs(new Path("/hdfsPath"));

FileSystem物件的copyFromLocalFile(localFilePath, hdfsFilePath)方法用於從本地上傳檔案至hdfs系統,copyToLocalFile(hdfsFilePath, localFilePath)方法用於下載檔案。

        // 3 上傳檔案
        fs.copyFromLocalFile(new Path("D:/localFile.txt"), new Path("/hdfsFile.txt"));
        // 4 下載檔案
        fs.copyToLocalFile(new Path("/hdfsFile.txt"), new Path("D:/localFile2.txt"));

FileSystem物件的delete(hdfsPath, boolean)方法用於刪除hdfs檔案目錄或檔案,第二個引數為true時,表示遞迴刪除。

        //  5 刪除檔案
        fs.delete(new Path("/hdfsPath"), true);

最後,關閉FileSystem物件資源。

        // 6 關閉資源
        fs.close();

3 使用流物件操作hdfs

    // 上傳檔案至hdfs
    @Test
    public void testPutFileToHdfs() throws URISyntaxException, IOException, InterruptedException {
        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

        // 2 獲取輸入流
        FileInputStream fis = new FileInputStream("D:/localFile.txt");
        // 3 獲取輸出流
        FSDataOutputStream fos = fs.create(new Path("/hdfsFile.txt"));

        // 4 資料傳輸
        IOUtils.copyBytes(fis, fos, conf);

        // 5 關閉資源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();
    }
View Code

首先,獲取FileSystem物件。

        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

之後,獲取輸入流與輸出流物件。當從本地上傳檔案至hdfs時,本地檔案是輸入流,hdfs檔案是輸出流;當從hdfs下載檔案至本地時,hdfs檔案是輸入流,本地檔案是輸出流。下面以上傳檔案為例。輸入流FileInputStream物件通過例項化FileInputStream類得到,建構函式引數為本地檔案路徑。輸出流FSDataOutputStream物件通過FileSystem物件的create(hdfsPath)方法得到。

        // 2 獲取輸入流
        FileInputStream fis = new FileInputStream("D:/localFile.txt");
        // 3 獲取輸出流
        FSDataOutputStream fos = fs.create(new Path("/hdfsFile.txt"));

輸入流和輸出流都獲取完畢後,使用IOUtils類的靜態方法copyBytes(InputStream,OutputStream,Configuration)進行流之間的資料傳輸。

        // 4 資料傳輸
        IOUtils.copyBytes(fis, fos, conf);

最後,關閉資源。

        // 5 關閉資源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();

從hdfs下載檔案至本地類似,只是輸入流和輸出流的獲取有些變化,程式碼如下。

    // 從hdfs下載檔案至本地
    @Test
    public void testGetFileFromHdfs() throws URISyntaxException, IOException, InterruptedException {
        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

        // 2 獲取輸入流
        FSDataInputStream fis = fs.open(new Path("/hdfsFile.txt"));
        // 3 獲取輸出流
        FileOutputStream fos = new FileOutputStream("D:/localFile.txt");

        // 4 資料傳輸
        IOUtils.copyBytes(fis, fos, conf);

        // 5 關閉資源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();
    }
View Code

如果只是想在客戶端檢視hdfs檔案的內容而不下載至本地,也是可以的,此時輸出流可以換成System.out物件。程式碼如下。

    // 檢視hdfs檔案內容
    @Test
    public void testCatFile() throws URISyntaxException, IOException, InterruptedException {
        // 1 獲取FileSystem物件
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.102:9000"), conf, "wm");

        // 2 獲取輸入流
        FSDataInputStream fis = fs.open(new Path("/hdfsFile.txt"));
        // 3 獲取輸出流
        OutputStream os = System.out;

        // 4 資料傳輸
        IOUtils.copyBytes(fis, os, conf);

        // 5 關閉資源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(os);
        fs.close();
    }

參考:

【尚矽谷】大資料Hadoop 2.x教程丨配套實戰案例

《Hadoop權威指南 大資料的儲存與分析-第4版》