SparkStreaming整合kafka直連模式direct方式
val checkpoint =“hdfs://bdha/checkpoint”
val conf = new SparkConf().setMaster(“local”).setAppName(“AdRealStatJob”)
val sc=new SparkContext(conf)
def createFunction(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint(checkpoint)
val sqlContext =new SQLContext(sc)
val topics =Set(“Test”)
val kafkaParams = Map(
“metadata.broker.list” -> “centos01:9092,centos02:9092,centos03:9092”,//kafka主題叢集
“auto.offset.reset” -> “largest”
)
val messages: DStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(.2)
messages.persist(StorageLevel.MEMORY_ONLY)
val dateUserAd2CountsDStream:DStream[(String, Int)]=messages.map{case message => {
val fields = message.split("\s+")
val timestamp = fields(0).toLong
val province = fields(1)
val city = fields(2)
val userId = fields(3)
val adId = fields(4)
val date = DateUtils.formatDate(timestamp)
val key = date + "