1. 程式人生 > >Spark Streaming接收kafka資料,輸出到HBase

Spark Streaming接收kafka資料,輸出到HBase

需求

Kafka + SparkStreaming + SparkSQL + HBase
輸出TOP5的排名結果
排名作為Rowkey,word和count作為Column

實現

建立kafka生產者模擬隨機生產資料

object producer {
  def main(args: Array[String]): Unit = {
    val topic ="words"
    val brokers ="master:9092,slave1:9092,slave2:9092"
    val prop=new Properties()
    prop.put("metadata.broker.list"
,brokers) prop.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig=new ProducerConfig(prop) val producer=new Producer[String,String](kafkaConfig) val content:Array[String]=new Array[String](5) content(0)="kafka kafka produce" content(1)="kafka produce message"
content(2)="hello world hello" content(3)="wordcount topK topK" content(4)="hbase spark kafka" while (true){ val i=(math.random*5).toInt producer.send(new KeyedMessage[String,String](topic,content(i))) println(content(i)) Thread.sleep(200) } } }

建立spark streaming

val conf = new SparkConf().setMaster("local[2]").setAppName("Networkcount")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))

配置kafka,通過KafkaUtils.createDirectStream讀取kafka傳遞過來的資料

val topic = Set("words")
    val brokers = "master:9092,slave1:9092,slave2:9092"
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)

使用sparksql進行wordcount與topN處理,寫入hbase

var rank = 0; //用來記錄當前資料序號
val sqlcontext = new SQLContext(sc)
    import sqlcontext.implicits._

    val lines = kafkaStream.window(Seconds(10), Seconds(3)).flatMap(line => {
      Some(line._2.toString)
    }).foreachRDD({ rdd: RDD[String] =>

      val df = rdd.flatMap(_.split(" ")).toDF.withColumnRenamed("_1", "word")
      val table = df.registerTempTable("words")

      val ans = sqlcontext.sql("select word, count(*) as total from words group by word order by count(*) desc").limit(5).map(x => {
        rank += 1
        (rank, x.getString(0), x.getLong(1))
      })
      rank = 0

資料寫入hbase的方式一(批量寫入)

ans.map(x => {
              val put = new Put(Bytes.toBytes(x._1.toString))
              put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
              put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
              put
            }).foreachPartition(x => {
              val conn = ConnectionFactory.createConnection(HBaseConfiguration.create)
              val table = conn.getTable(TableName.valueOf("window"))
              //兩種獲得table的方式
              //        var jobConf = new JobConf(HBaseConfiguration.create)
              //        val table = new HTable(jobConf, TableName.valueOf("window"))
              import scala.collection.JavaConversions._
              table.put(seqAsJavaList(x.toSeq))
            })

資料寫入hbase的方式一(單條寫入)

      ans.foreachPartition(partitionRecords=>{
        val tablename = "window"
        val hbaseconf = HBaseConfiguration.create()
        val conn = ConnectionFactory.createConnection(hbaseconf)
        val tableName = TableName.valueOf(tablename)
        val table = conn.getTable(tableName)
        partitionRecords.foreach(x => {
          val put = new Put(Bytes.toBytes(x._1.toString))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
          put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
          table.put(put)
        })
        table.close()
      })
    })

使用saveAsHadoopDataset
saveAsHadoopFile是將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API
saveAsHadoopDataset用於將RDD儲存到除了HDFS的其他儲存中,比如HBase

var jobConf = new JobConf(HBaseConfiguration.create)
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, "window")
      jobConf.setOutputFormat(classOf[TableOutputFormat])//不加這句會報錯Undefined job output-path
      //在JobConf中,通常需要關注或者設定五個引數
檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數
      ans.map(x => {
        val put = new Put(Bytes.toBytes(x._1.toString))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
        (new ImmutableBytesWritable , put)
      }).saveAsHadoopDataset(jobConf)

使用新版API:saveAsNewAPIHadoopDataset

sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
      var job = new Job(sc.hadoopConfiguration)
      job.setOutputKeyClass(classOf[ImmutableBytesWritable])
      job.setOutputValueClass(classOf[Result])
      job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//這句會報錯,不知原因
      ans.map(x => {
        val put = new Put(Bytes.toBytes(x._1.toString))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
        (new ImmutableBytesWritable , put)
      }).saveAsNewAPIHadoopDataset(job.getConfiguration)//執行會報空指標

啟動spark streaming

ssc.start()
    ssc.awaitTermination() //等待處理停止,stop()手動停止

執行

producer生產資料

這裡寫圖片描述

hbase實時更新資料

批量寫入,時間戳一致

這裡寫圖片描述

單條寫入,時間戳有差異

這裡寫圖片描述