Structured Streaming如何實現Parquet儲存目錄按時間分割槽
阿新 • • 發佈:2019-01-29
緣由
StreamingPro現在支援以SQL指令碼的形式寫Structured Streaming流式程式了: mlsql-stream。不過期間遇到個問題,我希望按天進行分割槽,但是這個分割槽比較特殊,就是是按接收時間來落地進行分割槽,而不是記錄產生的時間。
當然,我可以新增一個時間欄位,然後使用partitionBy動態分割槽的方式解決這個問題,但是使用動態分割槽有一個麻煩的地方是,刪除資料並不方便。流式程式會不斷地寫入資料,我們需要將七天前的資料清理掉,因為採用partitionBy後,parquet的meta資訊是會在同一個目錄裡,然後裡面的檔案記錄了當前批次資料分佈在那些檔案裡。這樣導致刪除資料不方便了。
所以最好的方式是類似這樣的:
set today="select current_date..." options type=sql;
load kafka9....;
save append table21
as parquet.`/tmp/abc2/hp_date=${today}`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/cpl2";
這種方式的好處就是,刪除分割槽直接刪除就可以,壞處是,通過上面的方式,由於Structured Streaming的目錄地址是不允許變化的,也就是他拿到一次值之後,後續就固定了,所以資料都會寫入到服務啟動的那天。
解決方案
解決辦法是自己實現一個parquet sink,改造的地方並不多。新新增一個類:
class NewFileStreamSink( sparkSession: SparkSession, _path: String, fileFormat: FileFormat, partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { // 使用velocity模板引擎,方便實現複雜的模板渲染 def evaluate(value: String, context: Map[String, AnyRef]) = { RenderEngine.render(value, context) } // 將路徑獲取改成一個方法呼叫,這樣每次寫入時,都會通過方法呼叫 //從而獲得一個新值 def path = { evaluate(_path, Map("date" -> new DateTime())) } -- 這些路徑獲取都需要變成方法 private def basePath = new Path(path) private def logPath = new Path(basePath, FileStreamSink.metadataDir) private def fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { val committer = FileCommitProtocol.instantiate( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = path, isAppend = false) committer match { case manifestCommitter: ManifestFileCommitProtocol => manifestCommitter.setupManifestOptions(fileLog, batchId) case _ => // Do nothing } FileFormatWriter.write( sparkSession = sparkSession, queryExecution = data.queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), hadoopConf = hadoopConf, partitionColumnNames = partitionColumnNames, bucketSpec = None, refreshFunction = _ => (), options = options) } } override def toString: String = s"FileSink[$path]" }
實現sink之後,我們還需要一個DataSource 以便我們能讓這個新的Sink整合進Spark裡並被外部使用:
package org.apache.spark.sql.execution.streaming.newfile
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming. Sink
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
class DefaultSource extends StreamSinkProvider {
override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
val path = parameters.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
if (outputMode != OutputMode.Append) {
throw new AnalysisException(
s"Data source ${getClass.getCanonicalName} does not support $outputMode output mode")
}
new NewFileStreamSink(sqlContext.sparkSession, parameters("path"), new ParquetFileFormat(), partitionColumns, parameters)
}
}
這個是標準的datasource API。 現在使用時可以這樣:
save append table21
-- 使用jodatime的語法
as parquet.`/tmp/jack/hp_date=${date.toString("yyyy-MM-dd")}`
options mode="Append"
and duration="10"
-- 指定實現類
and implClass="org.apache.spark.sql.execution.streaming.newfile"
and checkpointLocation="/tmp/cpl2";
是不是很方便?
額外的問題
在spark 2.2.0 之後,對meta檔案合併,Spark做了些調整,如果合併過程中,發現之前的某個checkpoint點 檔案會丟擲異常。在spark 2.2.0則不存在這個問題。其實spark團隊應該把這個作為可選項比較好,允許丟擲或者保持安靜。