1. 程式人生 > >spark原始碼閱讀一-spark-mongodb程式碼分析

spark原始碼閱讀一-spark-mongodb程式碼分析

原始碼的github地址https://github.com/mongodb/mongo-spark,是mongodb釋出的spark connection介面庫,可以方便的使用spark讀寫mongodb資料 1.rdd寫入mongodb 兩種方式將生成的rdd寫入mongodb,事例程式碼: val sc = getSparkContext(args) import com.mongodb.spark._ import org.bson.Document val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}"))) MongoSpark.save(documents) import com.mongodb.spark.config._ val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc))) val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}"))) MongoSpark.save(sparkDocuments, writeConfig) 呼叫函式如下 MongoSpark.save(rdd) MongoSpark.save(rdd, writeConfig)) 看MongoSpark.save的定義:def save[D: ClassTag](rdd: RDD[D]): Unit = save(rdd, WriteConfig(rdd.sparkContext)), 實際最終都呼叫到了MongoSpark.save(rdd, writeConfig)),來看這個函式實現:
def save[D: ClassTag](rdd: RDD[D], writeConfig: WriteConfig): Unit = {     val mongoConnector = MongoConnector(writeConfig.asOptions)     rdd.foreachPartition(iter => if (iter.nonEmpty) {         mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>             iter.grouped(writeConfig.maxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava))         })     }) }
具體就是mongoConnector.withCollectionDo對rdd每個partition每條記錄寫入mongodb。  2.從mongodb讀出資料到rdd 讀取的方式也是兩種,事例程式碼 val rdd = MongoSpark.load(sc) println(rdd.count) val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc))) val customRdd = MongoSpark.load(sc, readConfig) println(customRdd.count) 跟寫入mongdb save函式類似,讀取函式最終呼叫MongoSpark.load(sc, readConfig)
def load[D: ClassTag](sc: SparkContext, readConfig: ReadConfig)(implicit e: D DefaultsTo Document): MongoRDD[D] = builder().sparkContext(sc).readConfig(readConfig).build().toRDD[D]() def builder(): Builder = new Builder Builder程式碼 def sparkContext(sparkContext: SparkContext): Builder = {     this.sparkSession = Option(SparkSession.builder().config(sparkContext.getConf).getOrCreate())     this } def readConfig(readConfig: ReadConfig): Builder = {     this.readConfig = Option(readConfig)     this } def build(): MongoSpark = {     require(sparkSession.isDefined, "The SparkSession must be set, either explicitly or via the SparkContext”)     val session = sparkSession.get     val readConf = readConfig.isDefined match {         case true => ReadConfig(options, readConfig)         case false => ReadConfig(session.sparkContext.getConf, options)     }     val mongoConnector = connector.getOrElse(MongoConnector(readConf))     val bsonDocumentPipeline = pipeline.map(x => x.toBsonDocument(classOf[Document], mongoConnector.codecRegistry))     new MongoSpark(session, mongoConnector, readConf, bsonDocumentPipeline) } 根據readconfig配置mongo connector和pipeline,最終還是呼叫MongoSpark.toRDD程式碼 def toRDD[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] = rdd[D] private def rdd[D: ClassTag]()(implicit e: D DefaultsTo Document): MongoRDD[D] =     new MongoRDD[D](sparkSession, sparkSession.sparkContext.broadcast(connector), readConfig, pipeline)
最終返回mongo RDD,它是繼承於spark RDD,有了RDD就可以進行各種map reduce處理了。那麼現在就有個疑問了,mongo db裡面儲存的資料記錄非常多,而每個RDD都是有多個partition,是如何拆分這些partition呢?涉及到read config和partition配置,說明文件在https://github.com/mongodb/mongo-spark/blob/master/doc/2-configuring.md。     舉個例子說明下,ReadConfig可以配置partitioner,預設是MongoDefaultPartitioner。看看MongoDefaultPartitioner的引數,它封裝了MongoSamplePartitioner。其他的partitioner可以參考上述說明文件。
Property name Description Default value
partitionKey The field to partition the collection by. The field should be indexed and contain unique values.用哪個欄位來進行分割槽 _id
partitionSizeMB The size (in MB) for each partition.每個partition大小 64
samplesPerPartition The number of sample documents to take for each partition.每個paritition裡抽取的文件記錄個數
具體的程式碼實現MongoSamplePartitioner.scala和PartitionerHelper.scala。 從MongoSamplePartitioner.scala開始,先取出所有資料,計算每個分割槽記錄條數
val avgObjSizeInBytes = results.get("avgObjSize", new BsonInt64(0)).asNumber().longValue() val numDocumentsPerPartition: Int = math.floor(partitionSizeInBytes.toFloat / avgObjSizeInBytes).toInt val numberOfSamples = math.floor(samplesPerPartition * count / numDocumentsPerPartition.toFloat).toInt
假設每個記錄1k,那麼預設就是每個分割槽64k個文件物件;還要算上每個分割槽取樣個數,預設是10個取樣,假設db有128k記錄,那麼按照演算法取樣數numberOfSamples就是20個。最終就是分成2個分割槽,每個分割槽10條文件記錄。 def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count - 1 val rightHandBoundaries = samples.zipWithIndex.collect {     case (field, i) if collectSplit(i) => field.get(partitionKey) } 最後就到關鍵點了,建立分割槽,進入PartitionerHelper.scala val partitions = PartitionerHelper.createPartitions(partitionKey, rightHandBoundaries, PartitionerHelper.locations(connector), addMinMax) 根據前面劃分好的mongo db索引區間生成MongoPartition。我們可以看到partition只是儲存了每條記錄的key和db server ip,是在真正計算的時候才讀取出來。 其他的分割槽方式程式碼就是類似了,其實使用多的應該是MongoShardedPartitioner和MongoSplitVectorPartitioner,這裡就不再說明,參考程式碼。