spark讀取kafka資料寫入hbase
阿新 • • 發佈:2019-01-28
package com.prince.demo.test import java.util.UUID import com.typesafe.config.{Config, ConfigFactory} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by prince on 2018/5/17. */ object ReadKafka { Logger.getLogger("org").setLevel(Level.WARN) implicit val conf: Config = ConfigFactory.load def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("ReadKafka").master("local[*]").getOrCreate() val sparkContext = spark.sparkContext val ssc = new StreamingContext(sparkContext, Seconds(10)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> conf.getString("kafka.brokers"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> conf.getString("kafka.group"), "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val topic = "topic883" val topics = Array(topic) val stream = KafkaUtils .createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "slave5.hadoop,slave6.hadoop,slave7.hadoop") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") val tableName = "circle" val jobConf = new JobConf(hbaseConf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val input = stream.flatMap(line => { Some(line.value.toString) }) input.foreachRDD(rdd => { if (!rdd.isEmpty()){ val spark1 = SparkSession.builder.getOrCreate val df = spark1.read.json(rdd) df.createOrReplaceTempView("temp") val ans = spark1.sql("select time,token from temp").rdd.map(x => { (UUID.randomUUID.toString, x.getString(0), x.getString(1)) }) ans.map(line =>{ val put = new Put(Bytes.toBytes(line._1)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("id"), Bytes.toBytes(line._2)) put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("token"), Bytes.toBytes(line._3)) (new ImmutableBytesWritable, put) }).saveAsHadoopDataset(jobConf) } }) ssc.start() ssc.awaitTermination() } }
kafka中傳輸的資料格式為:
{"id":"b2e4a1bb-741b-4700-8b03-18c06a258","token":"h981bd53a475b4edc9b0ad5f72870b03","time":"1503364337536"}
模擬向kafka中寫資料的過程:
package com.prince.demo.kafka import java.util.Properties import com.typesafe.config.ConfigFactory import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import org.apache.log4j.{Level, Logger} import org.codehaus.jettison.json.JSONObject import scala.util.Random /** * 模擬資料 * Created by prince on 2017/9/11. */ object KafkaProducerData { Logger.getLogger("org").setLevel(Level.WARN) private val userid = Array( "b2e4a1bb-741b-4700-8b03-18c06a298", "b2e4a1bb-741b-4700-8b03-18c06a232", "b2e4a1bb-741b-4700-8b03-18c06a224", "b2e4a1bb-741b-4700-8b03-18c06a258", "b2e4a1bb-741b-4700-8b03-18c06a280", "b2e4a1bb-741b-4700-8b03-18c06a248", "b2e4a1bb-741b-4700-8b03-18c06a275", "b2e4a1bb-741b-4700-8b03-18c06a266", "b2e4a1bb-741b-4700-8b03-18c06a268", "b2e4a1bb-741b-4700-8b03-18c06a212" ) private val tokens = Array( "v2ff37ca54eda70e0c1b8902626cb6dd", "fb751fb989ce159e3ee5149927176474", "af89557c629d6b7af43378df4b8f30d9", "n3f164f9e9999eefa13064ac1e864fd8", "zbd6f5791a99249c3a513b21ce835038", "dc6470493c3c891db6f63326b19ef482", "k2917b1b391186ff8f032f4326778ef7", "ca796f74ee74360e169fc290f1e720c7", "h981bd53a475b4edc9b0ad5f72870b03", "p4064d445c9f4ff4d536dfeae965aa95" ) private val time = Array( "1503364335202", "1503364335776", "1503364336578", "1503364337536", "1503364336340", "1503364335832", "1503364336726", "1503364336387", "1503364336691", "1503364335857" ) private val random = new Random() def point(): Int = { random.nextInt(10) } def getId(): String = { userid(point()) } def getToken(): String = { tokens(point()) } def getTime(): String = { time(point()) } def main(args: Array[String]): Unit = { implicit val conf = ConfigFactory.load val topic = "topic883" val brokers = conf.getString("kafka.brokers") val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val kafkaConfig = new ProducerConfig(props) val producer = new Producer[String, String](kafkaConfig) while (true) { val event = new JSONObject() event .put("id", getId()) .put("token", getToken()) .put("time", getTime()) producer.send(new KeyedMessage[String, String](topic, "key", event.toString)) println("Message sent: " + event) Thread.sleep(1000) } } }