1. 程式人生 > >通過Spark Streaming處理交易數據

通過Spark Streaming處理交易數據

amp 引入 解決方案 框架 ins 容錯 ams 輕量 rdm

Apache Spark 是加州大學伯克利分校的 AMPLabs 開發的開源分布式輕量級通用計算框架。

由於 Spark 基於內存設計,使得它擁有比 Hadoop 更高的性能(極端情況下可以達到 100x),並且對多語言(Scala、Java、Python)提供支持。

其一棧式設計特點使得我們的學習和維護成本大大地減少,而且其提供了很好的容錯解決方案

業務場景

我們每天都有來自全國各地的天然氣購氣數據,並根據用戶的充氣,退氣,核銷等實時計算分析的是用戶訂單數數據,由於數據量比較大,單臺機器處理已經達到了瓶頸;綜合業務場景分析,我們選用 Spark Streaming + Kafka+Flume+Hbase+kudu 來處理這些日誌;又因為業務系統不統一,先通過Spark Streaming對數據進行清洗後再回寫kafka集群,因為會有其他業務也需要kafka的數據;通過通過不同的程序對kafka數據進行消費,用戶記錄以多版本方式記錄到hbase;需要經常統計的指標業務數據寫入kudu

業務代碼:

  創建DStream

val sparkConf = new SparkConf().setAppName("OrderSpark")

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))

返回的messages 是一個 DStream,它是對 RDD 的封裝,其上的很多操作都類似於 RDD;

createDirectStream 函數是 Spark 1.3.0 開始引入的,其內部實現是調用 Kafka 的低層次 API,Spark 本身維護 Kafka 偏移量等信息,所以可以保證數據零丟失

但是機器一旦宕機或者重啟時,可能會存在重復消費;因此我們可以通過自己對offset進行checkpoint

  獲取kafkaoffset

   val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    var offsetRanges 
= Array[OffsetRange]() kafkaStream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD(rdd=>{ for(o <- offsetRanges) { println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######") }
}

為了能夠在 Spark Streaming 程序掛掉後又能從斷點處恢復,我們每個批次進行向zookeeper進行 Checkpoint;

這裏我們沒有采用spark自帶的checkpoint,是因為一旦程序修改,之前序列化的checkpoint數據會沖突報錯,

當然checkpoint到文件也會隨之越大。(讀者可以自己搜索spark 文件checkpoint的弊端)

  啟動實時程序

    ssc.start()
    ssc.awaitTermination()

  因業務所需需要向kafka回寫數據

  

rdd.foreachPartition(partition=>{
        val props = new Properties()
        props.put("bootstrap.servers",Constans.brokers)
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String,String](props)
        partition.foreach(r=>{
          val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
      producer.send(record,new Callback() {
       override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
       if (null != e) {
       println("發送消息失敗=>"+msg)
       }
      }
      })
  }) producer.close() })

監控

系統部署上線之後,我們無法保證系統 7x24 小時都正常運行,即使是在運行著,我們也無法保證 Job 不堆積、是否及時處理 Kafka 中的數據;
而且 Spark Streaming 系統本身就不很穩定。所以我們需要實時地監控系統,包括監控Kafka 集群、Spark Streaming 程序。
我們所有的監控都是CDH自帶監控管理和Ganglia以及nagios,一旦檢測到異常,系統會自己先重試是否可以自己恢復,如果不行,就會給我們發送報警郵件和打電話。

通過Spark Streaming處理交易數據