Spark Streaming接收kafka資料,輸出到HBase
阿新 • • 發佈:2018-12-25
需求
Kafka + SparkStreaming + SparkSQL + HBase
輸出TOP5的排名結果
排名作為Rowkey,word和count作為Column
實現
建立kafka生產者模擬隨機生產資料
object producer {
def main(args: Array[String]): Unit = {
val topic ="words"
val brokers ="master:9092,slave1:9092,slave2:9092"
val prop=new Properties()
prop.put("metadata.broker.list" ,brokers)
prop.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig=new ProducerConfig(prop)
val producer=new Producer[String,String](kafkaConfig)
val content:Array[String]=new Array[String](5)
content(0)="kafka kafka produce"
content(1)="kafka produce message"
content(2)="hello world hello"
content(3)="wordcount topK topK"
content(4)="hbase spark kafka"
while (true){
val i=(math.random*5).toInt
producer.send(new KeyedMessage[String,String](topic,content(i)))
println(content(i))
Thread.sleep(200)
}
}
}
建立spark streaming
val conf = new SparkConf().setMaster("local[2]").setAppName("Networkcount")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
配置kafka,通過KafkaUtils.createDirectStream讀取kafka傳遞過來的資料
val topic = Set("words")
val brokers = "master:9092,slave1:9092,slave2:9092"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
使用sparksql進行wordcount與topN處理,寫入hbase
var rank = 0; //用來記錄當前資料序號
val sqlcontext = new SQLContext(sc)
import sqlcontext.implicits._
val lines = kafkaStream.window(Seconds(10), Seconds(3)).flatMap(line => {
Some(line._2.toString)
}).foreachRDD({ rdd: RDD[String] =>
val df = rdd.flatMap(_.split(" ")).toDF.withColumnRenamed("_1", "word")
val table = df.registerTempTable("words")
val ans = sqlcontext.sql("select word, count(*) as total from words group by word order by count(*) desc").limit(5).map(x => {
rank += 1
(rank, x.getString(0), x.getLong(1))
})
rank = 0
資料寫入hbase的方式一(批量寫入)
ans.map(x => {
val put = new Put(Bytes.toBytes(x._1.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
put
}).foreachPartition(x => {
val conn = ConnectionFactory.createConnection(HBaseConfiguration.create)
val table = conn.getTable(TableName.valueOf("window"))
//兩種獲得table的方式
// var jobConf = new JobConf(HBaseConfiguration.create)
// val table = new HTable(jobConf, TableName.valueOf("window"))
import scala.collection.JavaConversions._
table.put(seqAsJavaList(x.toSeq))
})
資料寫入hbase的方式一(單條寫入)
ans.foreachPartition(partitionRecords=>{
val tablename = "window"
val hbaseconf = HBaseConfiguration.create()
val conn = ConnectionFactory.createConnection(hbaseconf)
val tableName = TableName.valueOf(tablename)
val table = conn.getTable(tableName)
partitionRecords.foreach(x => {
val put = new Put(Bytes.toBytes(x._1.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
table.put(put)
})
table.close()
})
})
使用saveAsHadoopDataset
saveAsHadoopFile是將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API
saveAsHadoopDataset用於將RDD儲存到除了HDFS的其他儲存中,比如HBase
var jobConf = new JobConf(HBaseConfiguration.create)
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "window")
jobConf.setOutputFormat(classOf[TableOutputFormat])//不加這句會報錯Undefined job output-path
//在JobConf中,通常需要關注或者設定五個引數
檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數
ans.map(x => {
val put = new Put(Bytes.toBytes(x._1.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
(new ImmutableBytesWritable , put)
}).saveAsHadoopDataset(jobConf)
使用新版API:saveAsNewAPIHadoopDataset
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])//這句會報錯,不知原因
ans.map(x => {
val put = new Put(Bytes.toBytes(x._1.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(x._2.toString))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(x._3.toString))
(new ImmutableBytesWritable , put)
}).saveAsNewAPIHadoopDataset(job.getConfiguration)//執行會報空指標
啟動spark streaming
ssc.start()
ssc.awaitTermination() //等待處理停止,stop()手動停止
執行
producer生產資料
hbase實時更新資料
批量寫入,時間戳一致
單條寫入,時間戳有差異