1. 程式人生 > >Spark讀取HDFS或者AFS等檔案系統檔案

Spark讀取HDFS或者AFS等檔案系統檔案

                        Spark讀取HDFS或者AFS等檔案系統檔案

Spark讀取檔案有很多方法,我這裡主要介紹一下讀取非結構化的檔案的兩種方式,針對多檔案讀取,單檔案讀取也是一樣的。

方案一:spark的textFile方法,也是最簡單的方案,支援萬用字元,簡單好用

String afsFilePath="afs://afs.yun.com/app/file/*/sss*";
// String afsFilePath="afs://afs.yun.com/app/file/text/text.txt";
// String afsFilePath="afs://afs.yun.com/app/file/text/*.log";
// 讀取之後的是String型別的Dataframe物件,一般需要轉換為RDD去處理更方便
// spark會自動去迴圈目錄下的所有檔案,可以通過萬用字元的方式讀取你想要的檔案
// 萬用字元的方式還有很多,我這裡就不一一概述了
Dataset<String> stringDataset = spark.read().textFile(afsFilePath);
JavaRDD<String> logLinesStr = stringDataset.javaRDD();

JavaRDD<String> logLinesStr = spark.read().textFile(afsFilePath).javaRDD();

Hadoop支援的萬用字元與Unix bash相同

表1 萬用字元及其含義

萬用字元 名稱 匹配
星號  匹配0或多個任意字元
?  問號 匹配單一字元
[ab]   字元類 匹配{a,b}集合中的一個字元
[^ab]  非字元類 匹配非{a,b}集合中的一個字元
[a-z] 字元範圍 匹配一個在{a,z}範圍內的字元(包含az),a在字典順序上要小於z
[^a-z]  非字元範圍  匹配一個不在{a,b}範圍內的字元(包含ab),a在字典順序上要小於b
{a,b}  或選擇 匹配包含a或b中的一個表示式

方案二:使用迴圈目錄的方式去呼叫text方法,這種方法適用於需要讀取的檔名比較複雜,或者沒有規則,而且裡面有很多不是你想要的那種檔案啥的。

原理很簡單,就是傳遞一個目錄,然後去list目錄下的檔案,然後判斷是否是檔案下,是就遞迴,不是就獲取檔案絕對路徑,因為text方法只支援絕對檔案路徑讀取單個檔案。

程式碼寫的不是很好哈,有時間了在優化一下,主要就是看一下怎麼用就好。不讀取帶@的,是因為我的系統裡面帶這個檔名的都是通知檔案,不是我想要的。

// 這段程式碼主要是讀取afs上的檔案,hdfs的也一樣
AFS_URL = "afs://xxx.xxx.xxx";
public Dataset<Row> readFilesByPath(String path) {
        FileSystem fileSystem = null;
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", AFS_URL);
        List<Path> filePathList = new ArrayList<>();
        try {
            fileSystem = FileSystem.get(conf);
            System.out.println("filePath:" + path);
            getFilesInPath(fileSystem, filePathList, path);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Dataset<Row> stringDataset = null;
        for (Path filePath : filePathList) {
            String sulotionPath = filePath.toString();
            stringDataset = stringDataset == null ?
                    spark.read().text(sulotionPath) : stringDataset.union(spark.read().text(sulotionPath));
        }
        return stringDataset;
    }

    /**
     * 遞迴獲取指定目錄下的所有檔案絕對路徑
     */
    private void getFilesInPath(FileSystem fileSystem, List<Path> filesInPath, String filePath)
            throws IOException {
        Path path = new Path(filePath);
        FileStatus[] files = fileSystem.listStatus(path);
        for (FileStatus file : files) {
            if (file.isDirectory()) {
                getFilesInPath(fileSystem, filesInPath, file.getPath().toString());
            } else {
                // 不解析檔名帶有@符號的
                if (file.getPath().getName().contains("@")) {
                    continue;
                }
                System.out.println(file.getPath().toString());
                filesInPath.add(file.getPath());
            }
        }
    }