通過Spark Streaming處理交易數據
Apache Spark 是加州大學伯克利分校的 AMPLabs 開發的開源分布式輕量級通用計算框架。
由於 Spark 基於內存設計,使得它擁有比 Hadoop 更高的性能(極端情況下可以達到 100x),並且對多語言(Scala、Java、Python)提供支持。
其一棧式設計特點使得我們的學習和維護成本大大地減少,而且其提供了很好的容錯解決方案
業務場景
我們每天都有來自全國各地的天然氣購氣數據,並根據用戶的充氣,退氣,核銷等實時計算分析的是用戶訂單數數據,由於數據量比較大,單臺機器處理已經達到了瓶頸;綜合業務場景分析,我們選用 Spark Streaming + Kafka+Flume+Hbase+kudu 來處理這些日誌;又因為業務系統不統一,先通過Spark Streaming對數據進行清洗後再回寫kafka集群,因為會有其他業務也需要kafka的數據;通過通過不同的程序對kafka數據進行消費,用戶記錄以多版本方式記錄到hbase;需要經常統計的指標業務數據寫入kudu
業務代碼:
創建DStream
val sparkConf = new SparkConf().setAppName("OrderSpark") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))
返回的messages 是一個 DStream,它是對 RDD 的封裝,其上的很多操作都類似於 RDD;
createDirectStream 函數是 Spark 1.3.0 開始引入的,其內部實現是調用 Kafka 的低層次 API,Spark 本身維護 Kafka 偏移量等信息,所以可以保證數據零丟失
但是機器一旦宕機或者重啟時,可能會存在重復消費;因此我們可以通過自己對offset進行checkpoint
獲取kafkaoffset
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) var offsetRanges= Array[OffsetRange]() kafkaStream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD(rdd=>{ for(o <- offsetRanges) { println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######") }
}
為了能夠在 Spark Streaming 程序掛掉後又能從斷點處恢復,我們每個批次進行向zookeeper進行 Checkpoint;
這裏我們沒有采用spark自帶的checkpoint,是因為一旦程序修改,之前序列化的checkpoint數據會沖突報錯,
當然checkpoint到文件也會隨之越大。(讀者可以自己搜索spark 文件checkpoint的弊端)
啟動實時程序
ssc.start()
ssc.awaitTermination()
因業務所需需要向kafka回寫數據
rdd.foreachPartition(partition=>{ val props = new Properties() props.put("bootstrap.servers",Constans.brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) partition.foreach(r=>{ val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
producer.send(record,new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (null != e) {
println("發送消息失敗=>"+msg)
}
}
})
}) producer.close() })
監控
系統部署上線之後,我們無法保證系統 7x24 小時都正常運行,即使是在運行著,我們也無法保證 Job 不堆積、是否及時處理 Kafka 中的數據;
而且 Spark Streaming 系統本身就不很穩定。所以我們需要實時地監控系統,包括監控Kafka 集群、Spark Streaming 程序。
我們所有的監控都是CDH自帶監控管理和Ganglia以及nagios,一旦檢測到異常,系統會自己先重試是否可以自己恢復,如果不行,就會給我們發送報警郵件和打電話。
通過Spark Streaming處理交易數據