spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐
阿新 • • 發佈:2019-01-27
最近在學習spark streaming 相關知識,現在總結一下
主要程式碼如下
def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("myStreamingText").setMaster(ConfigInfo.MasterConfig) sparkConf.set("spark.streaming.kafka.maxRetries", "100") sparkConf.set("spark.streaming.kafka.maxRatePerParititon", "1000") val ssc = new StreamingContext(sparkConf,Seconds(ConfigInfo.durationConfig)) ssc.checkpoint(ConfigInfo.checkpointConfig) ssc } def createKafkaDstream(ssc : StreamingContext, kafkaParams : Map[String,String], topics : Set[String] ): InputDStream[(String,String)] = { val kafkaDstream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc, kafkaParams, topics) kafkaDstream } val ssc = StreamingContext.getOrCreate(ConfigInfo.checkpointConfig, createStreamingContext) val kafkaParams= Map[String,String]("metadata.brokers.list" -> ConfigInfo.brokerListConfig, "group.id" -> ConfigInfo.groupIdConfig) val topics = Set[String](ConfigInfo.groupIdConfig) val dStream = createKafkaDstream(ssc, kafkaParams, topics) def saveToHDFS(rdd : RDD[String]) : Unit ={ def convertData(line:String) = { val key = SparkUtil.fetchKey(line) (new Text(key), new Text(line)) } val dataToSaveHDFS : RDD[Tuple2[Text,Text]] = rdd.map( line =>{ convertData(line) }) val hadoopConf = rdd.context.hadoopConfiguration hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec") hadoopConf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK") dataToSaveHDFS.saveAsNewAPIHadoopFile(ConfigInfo.saveHdfsPathConfig, classOf[Text], classOf[Text], classOf[StreamingDataOutputFormat[Text,Text]]) }