1. 程式人生 > 其它 >kafka結合Spark-streming的直連(Direct)方式

kafka結合Spark-streming的直連(Direct)方式

轉載自https://www.cnblogs.com/dongxiucai/p/9971868.html

說明:此程式使用的scala編寫

在spark-stream+kafka使用的時候,有兩種連線方式一種是Receiver連線方式,一種是Direct連線方式。

  兩種連線方式簡介:

  Receiver接受固定時間間隔的資料(放在記憶體中),達到固定的時間才進行處理,效率極並且容易丟失資料。通過高階API,不用管理偏移量,由zk管理,若是拉取的資料超過,executor記憶體大小,訊息會存放到磁碟上面。0.10之後被捨棄。   弊端:效率極並且容易丟失資料

  直連(Direct)方式:**********重點 相當於直接連線到了kafka的分割槽上面,捨棄了高階API,所以需要自己手動管理偏移量。運用底層API。效率高。需要手動的維護偏移量。企業生產使用。   好處:不會走磁碟了,在拉取資料的時候,會有一個預處理機制。效率高。

  兩者的區別:  

  Receiver連線方式:他使用的是高階API實現Offset自動管理,不需要我們管理,所以他的靈活性特別差,不好,而且他處理資料的時候,如果某一時刻所傳來的資料量特別大那麼就會造成磁碟溢寫的情況,他通過WALs進行磁碟的寫入。   直連方式:他使用的是底層的API實現Offset我們開發人員管理,這樣的話,他的靈活性很好,並且可以保證資料的安全性,而且不用孤單行資料量過大。   現在主要使用的Direct直連的方式,而不在使用receiver方式   直連程式碼如下:
  1 import kafka.common.TopicAndPartition
  2 import kafka.message.MessageAndMetadata
3 import kafka.serializer.StringDecoder 4 import kafka.utils.{ZKGroupTopicDirs, ZkUtils} 5 import org.I0Itec.zkclient.ZkClient 6 import org.apache.spark.SparkConf 7 import org.apache.spark.rdd.RDD 8 import org.apache.spark.streaming.dstream.InputDStream 9 import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
10 import org.apache.spark.streaming.{Duration, StreamingContext} 11 import redis.clients.jedis.Jedis 12 13 object KafkaDirectConsumer { 14 def main(args: Array[String]): Unit = { 15 // 建立streaming 16 val conf = new SparkConf().setAppName("demo").setMaster("local[2]") 17 val ssc = new StreamingContext(conf, Duration(5000)) 18 // 建立 19 // 指定消費者組 20 val groupid = "gp01" 21 // 消費者 22 val topic = "tt1" 23 // 建立zk叢集連線 24 val zkQuorum = "spark101:2181,spark102:2181,spark103:2181" 25 // 建立kafka的叢集連線 26 val brokerList = "spark101:9092,spark102:9092,spark103:9092" 27 // 建立消費者的集合 28 // 在streaming中可以同時消費多個topic 29 val topics: Set[String] = Set(topic) 30 // 建立一個zkGroupTopicDir物件 31 // 此物件裡面存放這zk組和topicdir的對應資訊 32 // 就是在zk中寫入kafka的目錄 33 // 傳入 消費者組,消費者,會根據傳入的引數生成dir然後存放在zk中 34 val TopicDir = new ZKGroupTopicDirs(groupid, topic) 35 // 獲取存放在zk中的dir目錄資訊 /gp01/offset/tt 36 val zkTopicPath: String = s"${TopicDir.consumerOffsetDir}" 37 // 準備kafka的資訊、 38 val kafkas = Map( 39 // 指向kafka的叢集 40 "metadata.broker.list" -> brokerList, 41 // 指定消費者組 42 "group.id" -> groupid, 43 // 從頭開始讀取資料 44 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString 45 ) 46 // 建立一個zkClint客戶端,用host 和 ip 建立 47 // 用於從zk中讀取偏移量資料,並更新偏移量 48 // 傳入zk叢集連線 49 val zkClient = new ZkClient(zkQuorum) 50 // 拿到zkClient後去,zk中查詢是否存在檔案 51 // /gp01/offset/tt/0/10001 52 // /gp01/offset/tt/1/20001 53 // /gp01/offset/tt/2/30001 54 val clientOffset = zkClient.countChildren(zkTopicPath) 55 // 建立空的kafkaStream 裡面用於存放從kafka接收到的資料 56 var kafkaStream: InputDStream[(String, String)] = null 57 // 建立一個存放偏移量的Map 58 // TopicAndPartition [/gp01/offset/tt/0,10001] 59 var fromOffsets: Map[TopicAndPartition, Long] = Map() 60 // 判斷,是否婦女放過offset,若是存放過,則直接從記錄的 61 // 偏移量開始讀 62 if (clientOffset > 0) { 63 // clientOffset的數量就是 分割槽的數目量 64 for (i <- 0 until clientOffset) { 65 // 取出 /gp01/offset/tt/i/ 10001 -> 偏移量 66 val paratitionOffset = zkClient.readData[String](s"${zkTopicPath}/${i}") 67 // tt/ i 68 val tp = TopicAndPartition(topic, i) 69 // 新增到存放偏移量的Map中 70 fromOffsets += (tp -> paratitionOffset.toLong) 71 } 72 // 現在已經把偏移量全部記錄在Map中了 73 // 現在讀kafka中的訊息 74 // key 是kafka的kay,為null, value是kafka中的訊息 75 // 這個會將kafka的訊息進行transform 最終kafka的資料都會變成(kafka的key,message)這樣的tuple 76 val messageHandlers = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) 77 // 通過kafkaUtils來建立DStream 78 // String,String,StringDecoder,StringDecoder,(String,String) 79 // key,value,key的解碼方式,value的解碼方式,(接受的資料格式) 80 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( 81 ssc, kafkas, fromOffsets, messageHandlers 82 ) 83 } else { // 若是不存在,則直接從頭讀 84 // 根據kafka的配置 85 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkas, topics) 86 } 87 88 // 偏移量範圍 89 var offsetRanges = Array[OffsetRange]() 90 91 kafkaStream.foreachRDD { 92 kafkaRDD => 93 // 得到kafkaRDD,強轉為HasOffsetRanges,獲得偏移量 94 // 只有Kafka可以強轉為HasOffsetRanges 95 offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges 96 97 // 觸發Action,這裡去第二個值為真實的資料 98 val mapRDD = kafkaRDD.map(_._2) 99 /*=================================================*/      // mapRDD為資料,在這裡對資料操作     // 在這裡寫你自己的業務處理程式碼程式碼     // 此程式可以直接拿來使用,經歷過層層考驗     /*=================================================*/ 100 101 // 儲存更新偏移量 102 for (o <- offsetRanges) { 103 // 獲取dir 104 val zkPath = s"${zkTopicPath}/${o.partition}" 105 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) 106 } 107 } 108 109 ssc.start() 110 ssc.awaitTermination() 111 112 } 113 }

以上為Direct直連方式的程式碼,直接可以使用的,根據自己的叢集,和topic,groupid等配置稍作修改即可。