SparkStreaming部分:OutPutOperator類,SaveAsHadoopFile運算元(實際上底層呼叫textFileStream讀取的,跟前兩種有一些區別)【Java版純程式碼】
阿新 • • 發佈:2018-12-30
package streamingOperate.output; import java.util.Arrays; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * saveAsHadoopFiles(prefix, [suffix]): * 將此DStream的內容另存為Hadoop檔案。每批次資料產生的檔名稱格式基於:prefix和suffix: "prefix-TIME_IN_MS[.suffix]". * * spark中普通rdd可以直接只用saveAsTextFile(path)的方式,儲存到本地,但是此時DStream的只有saveAsTextFiles()方法,沒有傳入路徑的方法, * 其引數只有prefix, suffix * 其實:DStream中的saveAsTextFiles方法中又呼叫了rdd中的saveAsTextFile方法,我們需要將path包含在prefix中 * */ public class Operate_saveAsHadoopFiles { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("Operate_saveAsHadoopFiles"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5)); JavaDStream<String> textFileStream = jsc.textFileStream("data"); JavaDStream<String> flatMap = textFileStream.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; public Iterable<String> call(String t) throws Exception { return Arrays.asList(t.split(" ")); } }); JavaPairDStream<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>(t.trim(), 1); } }); //儲存在當前路徑中savedata路徑下,以prefix開頭,以suffix結尾的檔案。 //存hdfs上路徑示例: // mapToPair.saveAsHadoopFiles("hdfs://node1:9000/log/prefix", "suffix", Text.class, IntWritable.class, TextOutputFormat.class); //存本地路徑示例: // mapToPair.saveAsHadoopFiles(".\\savedata\\prefix", "suffix", Text.class, IntWritable.class, TextOutputFormat.class); //也可以這樣寫: mapToPair.saveAsHadoopFiles("./savedata/prefix", "suffix", Text.class, IntWritable.class, TextOutputFormat.class); jsc.start(); jsc.awaitTermination(); jsc.close(); } }