1. 程式人生 > >Spark實現根據key值來分目錄儲存檔案 多檔案輸出(MultipleOutputFormat)

Spark實現根據key值來分目錄儲存檔案 多檔案輸出(MultipleOutputFormat)

假設我們有這樣的(key,value)資料:

 sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))

我們想把它們存到路徑“output/”下面,而且key值相同的儲存在同一檔案下,不同的key值儲存在不同的檔案下,效果如下

命令:cat output/20180701/part-0000 

檔案內容顯示:

"aaa"

"ccc"

命令:cat output/20180702/part-0000 

檔案內容顯示:

"bbb"

這篇文章跟其他文章不一樣的地方是,輸出的檔案內容不帶Key值!!!這種在大多數場景都是需要的,帶key值輸出到檔案內容當中大多數並不需要。

實現完整程式碼如下:

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.toString+"/"+name
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)
	sc.parallelize(List((20180701, "aaa"), (20180702, "bbb"), (20180701, "ccc")))
    .saveAsHadoopFile("output", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    sc.stop()
  }
}

程式碼解析:

1、我們儲存檔案要通過hadoop 的 saveAsHadoopFile來實現,而不是通過Spark的saveAsTextFile,因為後者不能實現多檔案輸出。而且,其實我們去看Spark原始碼就知道,Spark的saveAsTextFile其實也是通過hadoop 的 saveAsHadoopFile來實現的。

2、 saveAsHadoopFile有四個引數,這裡我們只要關注第1個和第4個引數即可。第1個引數是輸出根路徑,第4個引數是輸出多檔案的格式設定包括檔名等。

3、第4個引數我們採用RDDMultipleTextOutputFormat類,在該類中我們重寫了generateFileNameForKeyValue函式(設定輸出檔名根據key和value值)以及generateActualKey函式(設定在檔案內容中是否輸出key值)。

4、generateFileNameForKeyValue函式有3個引數,key和value就是我們RDD的Key和Value,而name引數是每個Reduce的編號即輸出檔名。

5、generateActualKey函式中的 NullWritable.get(),NullWritable是Writable的一個特殊類,實現方法為空實現,不從資料流中讀資料,也不寫入資料,只充當佔位符。

例如要設定map的輸出為<key,空>:

public class IPCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        String parseIP = LogParseUtil.parseIP(value.toString());
        context.write(new Text(parseIP), NullWritable.get());
    }