1. 程式人生 > 其它 >Hadoop之HDFS中的Java API操作

Hadoop之HDFS中的Java API操作

技術標籤:HadoopJavahadoophdfsjavaapi

API使用

一、準備工作

1.1、解壓

解壓 hadoop 安裝包到非中文路徑(例如:D:\users\hadoop-2.6.0-cdh5.14.2)

1.2、環境變數

在 windows 上配置 HADOOP_HOME 環境變數(與 windows 配置 jdk 環境變數
方法類似)

1.3、新建工程

使用開發工具建立一個 Maven 工程

1.4、依賴包

匯入相應的依賴,依賴如下:

<dependencies>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>RELEASE</version>
	</dependency>
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.14.2</version> </dependency> </dependencies>

注意:Maven 倉庫沒有支援 cdh 相關依賴,cloudera 自己建立了一個相關的
倉庫,需要在 pom 單獨新增 cloudera 倉庫。

<repositories>
	 <repository>
		 <id>cloudera</id>
		 <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
	 </repository>
</repositories>

1.5、測試

建立一個包cn.big.data,建立 HdfsClient 類,使用 Junit 方式測試
建立一個目錄

package cn.big.data;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HdfsClient {
    @Test
    public void testMkdirs() throws IOException, InterruptedException, URISyntaxException {

        // 1 獲取檔案系統
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"), conf, "root");

        // 2 建立目錄
        fs.mkdirs(new Path("/myApi"));

        // 3 關閉資源
        fs.close();
    }
}

1.6、注意事項

如果 idea 打印不出日誌,在控制檯上只顯示如下資訊

1.log4j:WARNNoappenderscouldbefoundforlogger(org.apache.hadoop.
util.Shell).
2.log4j:WARNPleaseinitializethelog4jsystemproperly.
3.log4j:WARNSeehttp://logging.apache.org/log4j/1.2/faq.html#noconfi
gformoreinfo.

需要在專案的 src/main/resources 目錄下,新建一個檔案,命名為
“log4j.properties”,在檔案中填入:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

二、使用方法

2.1、HDFS 檔案上傳

@Test
    public void upLoad() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        // 設定副本儲存數量為1,預設是3
        configuration.set("dfs.replication","1");
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");
        //上傳檔案
        fs.copyFromLocalFile(new Path("D:\\study\\codes\\hadoop\\HdfsClientDemo\\data\\hdfsDemo\\test.txt"),new Path("/myApi/"));
        //關閉資源
        fs.close();

        System.out.println("ok");
    }

2.2、HDFS 檔案下載

@Test
    public void downLoad() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //下載檔案
        // boolean delSrc 指是否將原檔案刪除
        // Path src 指要下載的檔案路徑
        // Path dst 指將檔案下載到的路徑
        // boolean useRawLocalFileSystem 是否開啟檔案校驗
        fs.copyToLocalFile(false,new Path("/myApi/test.txt"),new Path("D:\\study\\codes\\hadoop\\HdfsClientDemo\\HdfsTest"),true);
        fs.close();
    }

2.3、HDFS 資料夾刪除

@Test
    public void dRemove() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //刪除資料夾
        fs.delete(new Path("/myApi/remove"),true);
        fs.close();
    }

2.4、HDFS 檔名更改

public void fRename() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //修改檔名
        fs.rename(new Path("/myApi/test.txt"),new Path("/myApi/testRename.txt"));
        fs.close();
    }

2.5、HDFS 檔案詳情檢視

@Test
    public void testListFiles() throws IOException, URISyntaxException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //獲取檔案詳情
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),true);
        while (listFiles.hasNext()){
            LocatedFileStatus status = listFiles.next();
            //輸出詳情
            //檔名稱
            System.out.println(status.getPath().getName());
            //長度
            System.out.println(status.getLen());
            //許可權
            System.out.println(status.getPermission());
            //組
            System.out.println(status.getGroup());
            //獲取儲存的塊資訊
            BlockLocation[] blockLocations = status.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                //獲取塊儲存的主機節點
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println(host);
                }
            }
            System.out.println("-------------------------------");
        }
    }

2.6、HDFS 檔案和資料夾判斷

@Test
    public void testListStatus() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //判斷是檔案還是資料夾
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isFile()){
                System.out.println("f:"+fileStatus.getPath().getName());
            }else {
                System.out.println("d:"+fileStatus.getPath().getName());
            }
        }
        fs.close();
    }

2.7、HDFS 的 I/O 流操作

2.7.1 檔案上傳

@Test
    public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //建立輸入流
        FileInputStream fis = new FileInputStream(new File("D:\\study\\codes\\hadoop\\HdfsClientDemo\\HdfsTest\\test.txt"));
        //獲取輸出流
        FSDataOutputStream fos = fs.create(new Path("/myApi/testIO.txt"));
        //執行流拷貝
        IOUtils.copyBytes(fis,fos,configuration);
        //關閉資源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
    }

2.7.2 檔案下載

@Test
    public void getFileFromHDFS() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        //獲取輸入流
        FSDataInputStream fis = fs.open(new Path("/myApi/testIO.txt"));
        //獲取輸出流
        FileOutputStream fos = new FileOutputStream(new File("D:\\study\\codes\\hadoop\\HdfsClientDemo\\HdfsTest\\IODownload.txt"));
        //流的對拷
        IOUtils.copyBytes(fis,fos,configuration);
        //關閉資源
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
        fs.close();
    }

2.8、定位檔案讀取

這裡強調可以設定任意位置讀取 hdfs 檔案,對於 mapreduce 分片 inputsplit 和 spark 分割槽理解有一定幫助。
先將 hadoop 安裝包上傳到 HDFS 檔案系統
下載第一塊

@Test
    public void readFileSeek1() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        FSDataInputStream fis = fs.open(new Path("/myApi//hadoop-2.6.0-cdh5.14.2.tar.gz"));
        FileOutputStream fos = new FileOutputStream(new File("C:\\Users\\Dongue\\Desktop\\seek\\hadoop-2.6.0-cdh5.14.2.tar.gz.part1"));
        //流的拷貝
        byte[] buf = new byte[1024];
        for (int i = 0; i < 1024 * 128; i++) {
            fis.read(buf);
            fos.write(buf);
        }
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
    }

下載成功
在這裡插入圖片描述
下載第二塊

@Test
    public void readFileSeek2() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.247.130:9000"),configuration,"root");

        FSDataInputStream fis = fs.open(new Path("/myApi//hadoop-2.6.0-cdh5.14.2.tar.gz"));
        //定位輸入資料位置
        fis.seek(1024*1024*128);
        FileOutputStream fos = new FileOutputStream(new File("C:\\Users\\Dongue\\Desktop\\seek\\hadoop-2.6.0-cdh5.14.2.tar.gz.part2"));
        //流的對拷
        IOUtils.copyBytes(fis,fos,configuration);

        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
    }

合併檔案
在 window 命令視窗中執行

type hadoop-2.6.0-cdh5.14.2.tar.gz.part2 >> hadoop-2.6.0-cdh5.14.2.tar.gz.part1

合併後就是完整的 hadoop 安裝包檔案
在這裡插入圖片描述