深入淺出Spark2.1.0度量系統——Sink繼承體系
Source準備好度量資料後,我們就需要考慮如何輸出和使用的問題。這裡介紹一些常見的度量輸出方式:阿里資料部門採用的一種度量使用方式就是輸出到日誌;在命令列執行過Hadoop任務(例如:mapreduce)的使用者也會發現控制檯列印的內容中也包含度量資訊;使用者可能希望將有些度量資訊儲存到檔案(例如CSV),以便將來能夠檢視;如果覺得使用CSV或者控制檯等方式不夠直觀,還可以將採集到的度量資料輸出到專用的監控系統介面。這些最終對度量資料的使用,或者說是輸出方式,Spark將它們統一抽象為Sink。Sink的定義見程式碼清單1。
程式碼清單1 度量輸出的定義
private[spark] trait Sink { def start(): Unit def stop(): Unit def report(): Unit }
從程式碼清單3-53可以看到Sink是一個特質,包含三個介面方法:
- start:啟動Sink;
- stop:停止Sink;
- report:輸出到目的地;
從這三個方法的解釋來看,很難讓讀者獲得更多的資訊。我們先把這些困惑放在一邊,來看看Spark中Sink的類繼承體系,如圖1所示。
圖1中展示了6種Sink的具體實現。
- ConsoleSink:藉助Metrics提供的ConsoleReporter的API,將度量輸出到System.out,因此可以輸出到控制檯。
- CsvSink:藉助Metrics提供的CsvReporter的API,將度量輸出到CSV檔案。
- MetricsServlet:在Spark UI的jetty服務中建立ServletContextHandler,將度量資料通過Spark UI展示在瀏覽器中。
- JmxSink:藉助Metrics提供的JmxReporter的API,將度量輸出到MBean中,這樣就可以開啟Java VisualVM,然後開啟Tomcat程序監控,給VisualVM安裝MBeans外掛後,選擇MBeans標籤頁可以對JmxSink所有註冊到JMX中的物件進行管理。
- Slf4jSink:藉助Metrics提供的Slf4jReporter的API,將度量輸出到實現了Slf4j規範的日誌輸出。
- GraphiteSink:藉助Metrics提供的GraphiteReporter的API,將度量輸出到Graphite(一個由Python實現的Web應用,採用django框架,用來收集伺服器狀態的監控系統)。
瞭解了Sink的類繼承體系,我們挑選Slf4jSink作為Spark中Sink實現類的例子,來了解Sink具體該如何實現。Slf4jSink的實現見程式碼清單2。
程式碼清單2 Slf4jSink的實現
private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"
val SLF4J_KEY_PERIOD = "period"
val SLF4J_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => SLF4J_DEFAULT_PERIOD
}
val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
override def report() {
reporter.report()
}
}
從Slf4jSink的實現可以看到Slf4jSink的start、stop及report實際都是代理了Metrics庫中的Slf4jReporter的start、stop及report方法。Slf4jReporter的start方法實際是其父類ScheduledReporter的start實現。而傳遞的兩個引數pollPeriod和pollUnit,正是被ScheduledReporter使用作為定時器獲取資料的週期和時間單位。有關ScheduledReporter中start、stop及Slf4jReporter的report方法的實現可以參閱《附錄D Metrics簡介》。
關於《Spark核心設計的藝術 架構設計與實現》
經過近一年的準備,《Spark核心設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
紙質版售賣連結如下: