Spark讀取HDFS或者AFS等檔案系統檔案
阿新 • • 發佈:2019-01-12
Spark讀取HDFS或者AFS等檔案系統檔案
Spark讀取檔案有很多方法,我這裡主要介紹一下讀取非結構化的檔案的兩種方式,針對多檔案讀取,單檔案讀取也是一樣的。
方案一:spark的textFile方法,也是最簡單的方案,支援萬用字元,簡單好用
String afsFilePath="afs://afs.yun.com/app/file/*/sss*"; // String afsFilePath="afs://afs.yun.com/app/file/text/text.txt"; // String afsFilePath="afs://afs.yun.com/app/file/text/*.log"; // 讀取之後的是String型別的Dataframe物件,一般需要轉換為RDD去處理更方便 // spark會自動去迴圈目錄下的所有檔案,可以通過萬用字元的方式讀取你想要的檔案 // 萬用字元的方式還有很多,我這裡就不一一概述了 Dataset<String> stringDataset = spark.read().textFile(afsFilePath); JavaRDD<String> logLinesStr = stringDataset.javaRDD(); JavaRDD<String> logLinesStr = spark.read().textFile(afsFilePath).javaRDD();
Hadoop支援的萬用字元與Unix bash相同
表1 萬用字元及其含義
萬用字元 | 名稱 | 匹配 |
---|---|---|
* | 星號 | 匹配0或多個任意字元 |
? | 問號 | 匹配單一字元 |
[ab] | 字元類 | 匹配{a,b}集合中的一個字元 |
[^ab] | 非字元類 | 匹配非{a,b}集合中的一個字元 |
[a-z] | 字元範圍 | 匹配一個在{a,z}範圍內的字元(包含az),a在字典順序上要小於z |
[^a-z] | 非字元範圍 | 匹配一個不在{a,b}範圍內的字元(包含ab),a在字典順序上要小於b |
{a,b} | 或選擇 | 匹配包含a或b中的一個表示式 |
方案二:使用迴圈目錄的方式去呼叫text方法,這種方法適用於需要讀取的檔名比較複雜,或者沒有規則,而且裡面有很多不是你想要的那種檔案啥的。
原理很簡單,就是傳遞一個目錄,然後去list目錄下的檔案,然後判斷是否是檔案下,是就遞迴,不是就獲取檔案絕對路徑,因為text方法只支援絕對檔案路徑讀取單個檔案。
程式碼寫的不是很好哈,有時間了在優化一下,主要就是看一下怎麼用就好。不讀取帶@的,是因為我的系統裡面帶這個檔名的都是通知檔案,不是我想要的。
// 這段程式碼主要是讀取afs上的檔案,hdfs的也一樣
AFS_URL = "afs://xxx.xxx.xxx";
public Dataset<Row> readFilesByPath(String path) {
FileSystem fileSystem = null;
Configuration conf = new Configuration();
conf.set("fs.defaultFS", AFS_URL);
List<Path> filePathList = new ArrayList<>();
try {
fileSystem = FileSystem.get(conf);
System.out.println("filePath:" + path);
getFilesInPath(fileSystem, filePathList, path);
} catch (IOException e) {
e.printStackTrace();
}
Dataset<Row> stringDataset = null;
for (Path filePath : filePathList) {
String sulotionPath = filePath.toString();
stringDataset = stringDataset == null ?
spark.read().text(sulotionPath) : stringDataset.union(spark.read().text(sulotionPath));
}
return stringDataset;
}
/**
* 遞迴獲取指定目錄下的所有檔案絕對路徑
*/
private void getFilesInPath(FileSystem fileSystem, List<Path> filesInPath, String filePath)
throws IOException {
Path path = new Path(filePath);
FileStatus[] files = fileSystem.listStatus(path);
for (FileStatus file : files) {
if (file.isDirectory()) {
getFilesInPath(fileSystem, filesInPath, file.getPath().toString());
} else {
// 不解析檔名帶有@符號的
if (file.getPath().getName().contains("@")) {
continue;
}
System.out.println(file.getPath().toString());
filesInPath.add(file.getPath());
}
}
}