計算成交量例子,kafka/spark streaming/zk
阿新 • • 發佈:2018-11-10
package com.ws.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object OrderCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OrderCount").setMaster("local[4]") val ssc = new StreamingContext(conf, Duration(5000)) //讀取ip規則 val broadcast: Broadcast[Array[(Long, Long, String)]] = generalIpRules(ssc.sparkContext,"") //建立組 val group = "group1" //指定消費者的topic主題 val topic = "customerOrder" //指定kafka的broker地址 val brokerList = "hadoop-01:9092,hadoop-02:9092,hadoop-03:9092" //指定zk地址,用來更新消費的偏移量時使用(也可以用redis,mysql) val zkQuorum = "hadoop-01:2181,hadoop-02:2181,hadoop-03:2181" //建立 stream 時使用的 topic 名字集合,SparkStreaming可同時消費多個topic val topics: Set[String] = Set(topic) //建立一個 ZKGroupTopicDirs 物件,其實是指定往zk中寫入資料的目錄,用於儲存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) //獲取 zookeeper 中的路徑 "/g001/offsets/wordcount/" val zkTopicPath = s"$topicDirs.consumerOffsetDir" //準備kafka引數 val kafkaParams = Map("metadata.broker.list" -> brokerList, "group.id" -> group, //"deserializer.encoding" -> "GB2312", //配置讀取Kafka中資料的編碼 //設定從頭開始讀取 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper 的host 和 ip,建立一個 client,用於跟新偏移量量的 //是zookeeper的客戶端,可以從zk中讀取偏移量資料,並更新偏移量 val zkClient = new ZkClient(zkQuorum) val children = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null //如果 zookeeper 中有儲存 offset,我們會利用這個 offset 作為 kafkaStream 的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果儲存過 offset //注意:偏移量的查詢是在Driver完成的 if (children > 0) { for (i <- 0 until children) { // /g001/offsets/wordcount/0/10001 // /g001/offsets/wordcount/0 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // wordcount/0 val tp = TopicAndPartition(topic, i) //將不同 partition 對應的 offset 增加到 fromOffsets 中 // wordcount/0 -> 10001 fromOffsets += (tp -> partitionOffset.toLong) } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的訊息進行 transform,最終 kafak 的資料都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //通過KafkaUtils建立直連的DStream(fromOffsets引數的作用是:按照前面計算好了的偏移量繼續消費資料) //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { //如果未儲存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的範圍 var offsetRanges = Array[OffsetRange]() //直連方式只有在KafkaDStream的RDD(KafkaRDD)中才能獲取偏移量,那麼就不能到呼叫DStream的Transformation //所以只能子在kafkaStream呼叫foreachRDD,獲取RDD的偏移量,然後就是對RDD進行操作了 //依次迭代KafkaDStream中的KafkaRDD //如果使用直連方式累加資料,那麼就要在外部的資料庫中進行累加(用KeyVlaue的記憶體資料庫(NoSQL),Redis) //kafkaStream.foreachRDD裡面的業務邏輯是在Driver端執行 kafkaStream.foreachRDD(kafkaRdd=>{ //判斷當前的kafkaStream中的RDD是否有資料 if(!kafkaRdd.isEmpty()){ //獲取偏移量,只有KafkaRDD可以強轉成HasOffsetRanges,並獲取到偏移量 offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges val lineData = kafkaRdd.map(_._2) val fields: RDD[Array[String]] = lineData.map(_.split(" ")) //計算總金額 CalculateUtil.calculateTotalCount(fields) //計算商品分類金額 CalculateUtil.calculateSortItem(fields) //計算每個地區金額 val broadcastValue: Array[(Long, Long, String)] = broadcast.value CalculateUtil.calculateProvince(fields,broadcastValue) //更新偏移量 for (o <- offsetRanges){ val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //將該 partition 的 offset 儲存到 zookeeper // /g001/offsets/wordcount/0/20000 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) } } }) ssc.start() ssc.awaitTermination() } /** * 生成ip規則,並廣播變數 */ def generalIpRules(sc : SparkContext, path : String): Broadcast[Array[(Long, Long, String)]] ={ val pathData = sc.textFile(path) val rules: RDD[(Long, Long, String)] = pathData.map(lines => { val strArr = lines.split("[|]") val ipNum1 = strArr(2).toLong val ipNum2 = strArr(3).toLong val provnice = strArr(6) (ipNum1, ipNum2, provnice) }) val rulesCollect = rules.collect() sc.broadcast(rulesCollect) } }
package com.ws.streaming import com.ws.spark.IpFromUtils import org.apache.spark.rdd.RDD object CalculateUtil { /** * 計算總金額 */ def calculateTotalCount(fields: RDD[Array[String]]): Unit = { if (!fields.isEmpty()) { val priceRdd: RDD[Double] = fields.map(arr => { val price = arr(arr.length - 1).toDouble price }) //將價格累加 val totalCount = priceRdd.reduce(_ + _) //獲取redis連線 val conn = RedisPool.getConnection() conn.incrByFloat("totalCount", totalCount) conn.close() } } /** * 計算商品分類金額 */ def calculateSortItem(fields: RDD[Array[String]]): Unit = { if (!fields.isEmpty()) { val itemAndPrice: RDD[(String, Double)] = fields.map(arr => { val item = arr(2) val price = arr(arr.length - 1).toDouble (item, price) }) //聚合 val itemCount: RDD[(String, Double)] = itemAndPrice.reduceByKey(_ + _) itemCount.foreachPartition(it => { //獲取redis連線 val conn = RedisPool.getConnection() it.foreach(f => { conn.incrByFloat(f._1, f._2) }) conn.close() }) } } /** * 計算地區總金額 */ def calculateProvince(fields: RDD[Array[String]], broadcastValue: Array[(Long, Long, String)]): Unit = { if (!fields.isEmpty()){ val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => { val ip = arr(1) val ipNum = IpFromUtils.ipToLong(ip) val provinceIndex = IpFromUtils.binarySearch(broadcastValue, ipNum) var province = "未知" if (-1 != provinceIndex) { province = broadcastValue(provinceIndex)._3 } val price = arr(arr.length - 1).toDouble (province, price) }) //將每個地區的金額累加寫入redis val result = provinceAndPrice.reduceByKey(_+_) result.foreachPartition(f=>{ val conn = RedisPool.getConnection() f.foreach(r=>{ conn.incrByFloat(r._1,r._2) }) conn.close() }) } } }
package com.ws.spark import scala.io.Source import scala.reflect.io.Path /** * 查詢ip歸屬地 */ object IpFromUtils { /** * ip轉換10進位制 * * @param ip * @return */ def ipToLong(ip: String): Long = { val ipArr = ip.split("[.]") var ipNum = 0L; for (i <- ipArr) { ipNum = i.toLong | ipNum << 8 } ipNum } /** * 生成規則 * * @param filePath * @return */ def rules(filePath: String): Array[(Long, Long, String)] = { val path = Source.fromFile(filePath.toString()) val array: Array[(Long, Long, String)] = path.getLines().map(lines => { val strArr = lines.split("[|]") val ipNum1 = strArr(2).toLong val ipNum2 = strArr(3).toLong val provnice = strArr(6) (ipNum1, ipNum2, provnice) }).toArray array } def generalRules(line : String): (Long, Long, String)={ val strArr = line.split("[|]") val ipStart = strArr(2).toLong val ipEnd = strArr(3).toLong val province = strArr(6) (ipStart, ipEnd, province) } /** * 二分法(前提有序) */ def binarySearch(array: Array[(Long, Long, String)], num: Long): Int = { var end = array.length - 1; var begin = 0; while (begin <= end) { val middle = (begin + end) >> 1; if (num >= array(middle)._1 && num <= array(middle)._2) { return middle } if (num < array(middle)._1) { end = middle - 1 } else { begin = middle + 1 } } -1 } }
redis :
package com.ws.streaming
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisPool {
val config = new JedisPoolConfig()
//最大空閒連線數
config.setMaxIdle(5)
//最大連線數
config.setMaxTotal(20)
val pool = new JedisPool(config,"192.168.127.12",6379,10000)
def getConnection(): Jedis ={
pool.getResource
}
}