1. 程式人生 > >springboot環境對Hadoop的HDFS操作

springboot環境對Hadoop的HDFS操作

之前已經介紹瞭如何搭建CentOS虛擬機器並且安裝Hadoop,使用命令成功訪問操作Hadoop的hdfs,接下來介紹如果使用java 程式碼操作Hadoop的hdfs.

一、環境準備

1.CentOS7

2.Hadoop3.1.1

3.SpringBoot2.1.0

程式碼地址:springboot整合hadoop專案程式碼

二、開發準備

1.使用IDEA新建一個SpringBoot專案,這個很簡單不懂的可以看我之前發的文章或者上網搜一下就可以了,這個是我的專案。

2.pom新增hadoop所依賴的jar

在剛建的SpringBoot專案的pom.xml檔案裡新增hadoop的依賴包hadoop-common, hadoop-client, hadoop-hdfs

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>3.1.1</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-hdfs</artifactId>
   <version>3.1.1</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>3.1.1</version>
</dependency>

 3.啟動hadoop服務

怎麼啟動hadoop的hdfs服務可以看我之前文章 hdfs啟動 

首先我們進入linux的hadoop安裝目錄,然後進入sbin目錄下,執行start-dfs.sh即可

cd /usr/local/hadoop/hadoop-3.1.1
/usr/local/hadoop/hadoop-3.1.1

啟動後我們就可以訪問下hdfs的檔案管理系統,地址就是hdfs-site.xml中配置的地址,我的linux虛擬機器IP是192.168.187.128,所以我的訪問地址是 http://192.168.187.128:50070/explorer.html#/

  就可以看到我們的檔案管理了。

三、開始編碼

對HDFS操作設計以下幾個主要的類:

Configuration:封裝了客戶端或者伺服器的配置資訊

FileSystem:此類的物件是一個檔案系統物件,可以用該物件的一些方法來對檔案進行操作通過FileSystem的靜態方法get獲得該物件,例:FileSystem hdfs = FileSystem.get(conf);

FSDataInputStream:這是HDFS中的輸入流,通過由FileSystem的open方法獲取

FSDataOutputStream:這是HDFS中的輸出流,通過由FileSystem的create方法獲取

1.配置hadoop資訊

我們在application.yml 裡面配置hdfs的IP地址和使用者名稱資訊

spring:
  application:
    name: hadoop
server:
  port: 9000


hdfs:
  path: hdfs://192.168.187.128:9000
  username: root

 2.我們封裝一個hadoop連線的工具類HadoopUtil用來訪問hdfs伺服器

  HadoopUtil 工具類完整程式碼:

/**
 * 類或方法的功能描述 :Hadoop工具類
 * @date: 2018-11-28 13:59
 */
@Component
public class HadoopUtil {
    @Value("${hdfs.path}")
    private String path;
    @Value("${hdfs.username}")
    private String username;

    private static String hdfsPath;
    private static String hdfsName;

    /**
     * 獲取HDFS配置資訊
     * @return
     */
    private static Configuration getConfiguration() {

        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        return configuration;
    }

    /**
     * 獲取HDFS檔案系統物件
     * @return
     * @throws Exception
     */
    public static FileSystem getFileSystem() throws Exception {
        // 客戶端去操作hdfs時是有一個使用者身份的,預設情況下hdfs客戶端api會從jvm中獲取一個引數作為自己的使用者身份 DHADOOP_USER_NAME=hadoop
//        FileSystem hdfs = FileSystem.get(getHdfsConfig()); //預設獲取
//        也可以在構造客戶端fs物件時,通過引數傳遞進去
        FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
        return fileSystem;
    }

    @PostConstruct
    public void getPath() {
        hdfsPath = this.path;
    }
    @PostConstruct
    public void getName() {
        hdfsName = this.username;
    }

    public static String getHdfsPath() {
        return hdfsPath;
    }

    public String getUsername() {
        return username;
    }
}

客戶端去操作HDFS時,是有一個使用者身份的,預設情況下,HDFS客戶端API會從JVM中獲取一個引數來作為自己的使用者身份:DHADOOP_USER_NAME=hadoop
FileSystem hdfs = FileSystem.get(getHdfsConfig()); //預設獲取
也可以在構造客戶端fs物件時,通過引數傳遞進去
FileSystem hdfs = FileSystem.get(new URI(rootPath), getHdfsConfig(), "你的使用者名稱");

最終我們通過呼叫 HadoopUtil.getFileSystem() 就可以獲取到hdfs的檔案管理

3.建立資料夾

首先我們檢視hdfs檔案系統上面只有一個Java資料夾,現在我們來建立一個叫demo的資料夾

我們通過RPC的方式來操作hdfs,首先新建一個 HadoopController 類,建立一個 mkdir 方法用來新建資料夾

/**
 * 類或方法的功能描述 :TODO
 * @date: 2018-11-28 13:51
 */
@RestController
@RequestMapping("/hadoop")
public class HadoopController {

    /**
     * 建立資料夾
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/mkdir")
    public BaseReturnVO mkdir(@RequestParam("path") String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return new BaseReturnVO("請求引數為空");
        }
        // 檔案物件
        FileSystem fs = HadoopUtil.getFileSystem();
        // 目標路徑
        Path newPath = new Path(path);
        // 建立空資料夾
        boolean isOk = fs.mkdirs(newPath);
        fs.close();
        if (isOk) {
            return new BaseReturnVO("create dir success");
        } else {
            return new BaseReturnVO("create dir fail");
        }
    }
} 

首先先通過我們封裝的 HadoopUtil  跟hdfs檔案系統建立連線,獲得檔案物件,然後根據入參 path 建立一個新的目標路徑 newPath,之後呼叫檔案物件的mkdir方法即可建立空資料夾,還是挺簡單的。

使用Postman傳送請求,引數為新建的目錄路徑 /demo

 

建立成功後我們去hdfs檔案系統檢視,發現有了demo這個檔案夾了

4.建立檔案

在HadoopController中新建一個createFile方法,在剛剛新建的demo資料夾中新建一個檔案

/**
     * 建立檔案
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/createFile")
    public BaseReturnVO createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception {
        if (StringUtils.isEmpty(path) || null == file.getBytes()) {
            return new BaseReturnVO("請求引數為空");
        }
        String fileName = file.getOriginalFilename();
        FileSystem fs = HadoopUtil.getFileSystem();
        // 上傳時預設當前目錄,後面自動拼接檔案的目錄
        Path newPath = new Path(path + "/" + fileName);
        // 開啟一個輸出流
        FSDataOutputStream outputStream = fs.create(newPath);
        outputStream.write(file.getBytes());
        outputStream.close();
        fs.close();
        return new BaseReturnVO("create file success");
    }

上傳的時候我們把檔名拼接在newPath後面自動根據檔名稱生成檔案路徑

上傳成功了,我們接著多上傳幾個檔案方便後面查詢使用

5.讀取檔案

新建一個readFile方法用來讀取我們上傳的檔案,我們讀取1.txt 檔案的路徑是 /demo/1.txt

/**
     * 讀取HDFS檔案內容
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/readFile")
    public BaseReturnVO readFile(@RequestParam("path") String path) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        Path newPath = new Path(path);
        InputStream in = null;
        try {
            in = fs.open(newPath);
            // 複製到標準的輸出流
            IOUtils.copyBytes(in, System.out, 4096);
        } finally {
            IOUtils.closeStream(in);
            fs.close();
        }
        return new BaseReturnVO("讀取成功");
    }

我們把檔案資訊輸出到了控制檯了

6.讀取目錄資訊

新建一個readPathInfo方法,讀取/demo下面的資訊

/**
     * 讀取HDFS目錄資訊
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/readPathInfo")
    public BaseReturnVO readPathInfo(@RequestParam("path") String path) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        Path newPath = new Path(path);
        FileStatus[] statusList = fs.listStatus(newPath);
        List<Map<String, Object>> list = new ArrayList<>();
        if (null != statusList && statusList.length > 0) {
            for (FileStatus fileStatus : statusList) {
                Map<String, Object> map = new HashMap<>();
                map.put("filePath", fileStatus.getPath());
                map.put("fileStatus", fileStatus.toString());
                list.add(map);
            }
            return new BaseReturnVO(list);
        } else {
            return new BaseReturnVO("目錄內容為空");
        }
    }

7.讀取檔案列表

新建一個方法listFile讀取/demo下的所有檔案

 /**
     * 讀取檔案列表
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/listFile")
    public BaseReturnVO listFile(@RequestParam("path") String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return new BaseReturnVO("請求引數為空");
        }
        FileSystem fs = HadoopUtil.getFileSystem();
        Path newPath = new Path(path);
        // 遞迴找到所有檔案
        RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(newPath, true);
        List<Map<String, String>> returnList = new ArrayList<>();
        while (filesList.hasNext()) {
            LocatedFileStatus next = filesList.next();
            String fileName = next.getPath().getName();
            Path filePath = next.getPath();
            Map<String, String> map = new HashMap<>();
            map.put("fileName", fileName);
            map.put("filePath", filePath.toString());
            returnList.add(map);
        }
        fs.close();
        return new BaseReturnVO(returnList);
    }

8.重新命名檔案 

新建一個renameFile方法

/**
     * 重新命名檔案
     * @param oldName
     * @param newName
     * @return
     * @throws Exception
     */
    @PostMapping("/renameFile")
    public BaseReturnVO renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception {
        if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
            return new BaseReturnVO("請求引數為空");
        }
        FileSystem fs = HadoopUtil.getFileSystem();
        Path oldPath = new Path(oldName);
        Path newPath = new Path(newName);
        boolean isOk = fs.rename(oldPath, newPath);
        fs.close();
        if (isOk) {
            return new BaseReturnVO("rename file success");
        } else {
            return new BaseReturnVO("rename file fail");
        }
    }

9.刪除檔案 

 新建一個deleteFile 方法

/**
     * 刪除檔案
     *
     * @param path
     * @return
     * @throws Exception
     */
    @PostMapping("/deleteFile")
    public BaseReturnVO deleteFile(@RequestParam("path") String path) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        Path newPath = new Path(path);
        boolean isOk = fs.deleteOnExit(newPath);
        fs.close();
        if (isOk) {
            return new BaseReturnVO("delete file success");
        } else {
            return new BaseReturnVO("delete file fail");
        }
    }

10.上傳本地檔案到hdfs

新建一個uploadFile方法,把我本地D盤的hello.txt檔案上傳上去

/**
     * 上傳檔案
     *
     * @param path
     * @param uploadPath
     * @return
     * @throws Exception
     */
    @PostMapping("/uploadFile")
    public BaseReturnVO uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        // 上傳路徑
        Path clientPath = new Path(path);
        // 目標路徑
        Path serverPath = new Path(uploadPath);

        // 呼叫檔案系統的檔案複製方法,第一個引數是否刪除原檔案true為刪除,預設為false
        fs.copyFromLocalFile(false, clientPath, serverPath);
        fs.close();
        return new BaseReturnVO("upload file success");
    }

檔案上傳成功了,我們檢視檔案系統

11.下載hdfs檔案到本地

新建一個download方法下載hdfs上的檔案到本地的D盤的hdfs資料夾中

/**
     * 下載檔案
     * @param path
     * @param downloadPath
     * @return
     * @throws Exception
     */
    @PostMapping("/downloadFile")
    public BaseReturnVO downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        // 上傳路徑
        Path clientPath = new Path(path);
        // 目標路徑
        Path serverPath = new Path(downloadPath);

        // 呼叫檔案系統的檔案複製方法,第一個引數是否刪除原檔案true為刪除,預設為false
        fs.copyToLocalFile(false, clientPath, serverPath);
        fs.close();
        return new BaseReturnVO("download file success");
    }

檔案下載成功,我們開啟本地的D盤

12.hdfs之間檔案複製

新建一個copyFile方法把/java/test.txt 檔案複製到/demo/test.txt下面

/**
     * HDFS檔案複製
     * @param sourcePath
     * @param targetPath
     * @return
     * @throws Exception
     */
    @PostMapping("/copyFile")
    public BaseReturnVO copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception {
        FileSystem fs = HadoopUtil.getFileSystem();
        // 原始檔案路徑
        Path oldPath = new Path(sourcePath);
        // 目標路徑
        Path newPath = new Path(targetPath);

        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try {
            inputStream = fs.open(oldPath);
            outputStream = fs.create(newPath);

            IOUtils.copyBytes(inputStream, outputStream, 1024*1024*64,false);
            return new BaseReturnVO("copy file success");
        } finally {
            inputStream.close();
            outputStream.close();
            fs.close();
        }
    }