Spark實現根據key值來分目錄儲存檔案 多檔案輸出(MultipleOutputFormat)
假設我們有這樣的(key,value)資料:
sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))
我們想把它們存到路徑“output/”下面,而且key值相同的儲存在同一檔案下,不同的key值儲存在不同的檔案下,效果如下
命令:cat output/20180701/part-0000
檔案內容顯示:
"aaa"
"ccc"
命令:cat output/20180702/part-0000
檔案內容顯示:
"bbb"
這篇文章跟其他文章不一樣的地方是,輸出的檔案內容不帶Key值!!!這種在大多數場景都是需要的,帶key值輸出到檔案內容當中大多數並不需要。
實現完整程式碼如下:
import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.toString+"/"+name } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Test").setMaster("local[*]") val sc = new SparkContext(conf) sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc"))) .saveAsHadoopFile("output", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) sc.stop() } }
程式碼解析:
1、我們儲存檔案要通過hadoop 的 saveAsHadoopFile來實現,而不是通過Spark的saveAsTextFile,因為後者不能實現多檔案輸出。而且,其實我們去看Spark原始碼就知道,Spark的saveAsTextFile其實也是通過hadoop 的 saveAsHadoopFile來實現的。
2、 saveAsHadoopFile有四個引數,這裡我們只要關注第1個和第4個引數即可。第1個引數是輸出根路徑,第4個引數是輸出多檔案的格式設定包括檔名等。
3、第4個引數我們採用RDDMultipleTextOutputFormat類,在該類中我們重寫了generateFileNameForKeyValue函式(設定輸出檔名根據key和value值)以及generateActualKey函式(設定在檔案內容中是否輸出key值)。
4、generateFileNameForKeyValue函式有3個引數,key和value就是我們RDD的Key和Value,而name引數是每個Reduce的編號即輸出檔名。
5、generateActualKey函式中的 NullWritable.get(),NullWritable是Writable的一個特殊類,實現方法為空實現,不從資料流中讀資料,也不寫入資料,只充當佔位符。
例如要設定map的輸出為<key,空>:
public class IPCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String parseIP = LogParseUtil.parseIP(value.toString());
context.write(new Text(parseIP), NullWritable.get());
}