1. 程式人生 > 實用技巧 >scala spark streaming 列印kafka 資料

scala spark streaming 列印kafka 資料

how-to-fix-java-io-notserializableexception-org-apache-kafka-clients-consumer

The Kafka Consumer record object is received from Dstream. When you try to print it, it gives error because that object is not serailizable. Instead you should get values from ConsumerRecord object and print it.

參考連結:https://stackoverflow.com/questions/40570874/how-to-fix-java-io-notserializableexception-org-apache-kafka-clients-consumer

1、獲取kafka資料

/**
 * @author xlxxx
 * @date xxxx 16:49
 * @version 1.0
 */
class WindowsFunction {

}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, LocationStrategies}
//import org.apache.spark.streaming.kafka.KafkaUtil
//import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WindowsFunction {

  //利用使用者消費金額總和計算結果以及使用者消費次數統計計算結果計算平均消費金額
  def avgFunction(sum:DStream[(String,Double)],count:DStream[(String,Int)]): DStream[(String,Double)] = {
    val payment = sum.join(count).map(r => {
      val user = r._1
      val sum = r._2._1
      val count = r._2._2
      (user,sum/count)
    })
    payment
  }

  def main (args: Array[String]) {

    def functionToCreateContext(): StreamingContext = {
      val conf = new SparkConf().setAppName("test").setMaster("local[*]")
      val ssc = new StreamingContext(conf, Seconds(5))

//      val zkQuorum = "localhost:2181,192.168.6.56:2181,192.168.6.57:2181"
      val zkQuorum = "localhost:9092"
      val brokers = zkQuorum
      val consumerGroupName = "user_payment"
      val kafkaTopic = "testkafka"
      val kafkaThreadNum = 1

      val topicMap = kafkaTopic.split(",").map((_, kafkaThreadNum.toInt)).toMap
      println(topicMap)
//      val user_payment = KafkaUtils.createDirectStream(ssc, zkQuorum, consumerGroupName, topicMap).map(x=>{
//        parse(x._2)
//      })

      val topicsSet = kafkaTopic.split(",").toSet
      val kafkaParams = Map[String, Object](
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
        ConsumerConfig.GROUP_ID_CONFIG -> consumerGroupName,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
      val user_payment = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
     //from old
//     val Array(brokers, groupId, topics) = args
//      val messages = KafkaUtils.createDirectStream[String, String](
//        ssc,
//        LocationStrategies.PreferConsistent,
//        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

      //from olds
      user_payment.foreachRDD { rdd =>
        rdd.foreach { record =>
          val value = record.value()
          println(record)
        }
      }
//      user_payment.map(jsonLine => print("========"+jsonLine))
//      user_payment.map(record=>(record.value().toString)).print
//      user_payment.print()
      //計算每5s每個使用者的消費總和
//      val paymentSum = user_payment.map(jsonLine =>{
////        implicit val formats = DefaultFormats
//        println(jsonLine)
////        val user = (jsonLine \ "user").extract[String]
////        val payment = (jsonLine \ "payment").extract[String]
////        (user,payment)
////        ('user',1)
//      }).flatMap((_.split(" "))).reduceByKey(_+_)
//      val paymentSum = user_payment.map(_.value).flatMap().reduceByKey(_+_)

      //輸出結果
//      paymentSum.print()
//
//      //計算每5s每個使用者的消費次數
//      val paymentCount = user_payment.map(jsonLine =>{
//        implicit val formats = DefaultFormats
//        val user = (jsonLine \ "user").extract[String]
//        (user,1)
//      }).reduceByKey(_+_)
//
//      //      paymentCount.print()
//
//      //計算每5s每個使用者平均的消費金額
//      val paymentAvg = avgFunction(paymentSum,paymentCount)
//      //      paymentAvg.print()

      //視窗操作,在其中計算不同時間段的結果,入庫的話根據使用場景選擇吧
//      def windowsFunction()  {
//        //每5秒計算最後30秒每個使用者消費金額
//        val windowSum_30 = paymentSum.reduceByKeyAndWindow((a: Double, b: Double) => (a + b),_-_, Seconds(30), Seconds(5))
//        //        windowSum_30.print()
//
//        //每5秒計算最後30秒每個使用者消費次數
//        val windowCount_30 = paymentCount.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),_-_, Seconds(30), Seconds(5))
//        //        windowCount_30.print()
//
//        //每5秒計算最後30秒每個使用者平均消費
//        val windowAvg_30 = avgFunction(windowSum_30,windowCount_30)
//        //        windowAvg_30.print()
//
//        //每5秒計算最後60秒每個使用者消費金額
//        val windowSum_60 = windowSum_30.reduceByKeyAndWindow((a:Double,b:Double)=>(a+b),_-_,Seconds(10),Seconds(5))
//        //       windowSum_60.print()
//
//        //每5秒計算最後60秒每個使用者消費次數
//        val windowCount_60 = windowCount_30.reduceByKeyAndWindow((a:Int,b:Int) => (a+b),_-_,Seconds(10),Seconds(5))
//        //        windowCount_60.print()
//
//        //每5秒計算最後60秒每個使用者平均消費
//        val windowAvg_60 = avgFunction(windowSum_60,windowCount_60)
//        //        windowAvg_60.print
//      }
//
//      windowsFunction()

      ssc
    }

    val context = StreamingContext.getOrCreate("checkPoint", functionToCreateContext _)

    context.start()
    context.awaitTermination()
  }
}

 2、debug 截圖:

//map 列印