pyspark對應的scala代碼PythonRDD對象
阿新 • • 發佈:2018-05-17
pri flat string oge 隨機 list localhost ride 執行
pyspark jvm端的scala代碼PythonRDD
代碼版本為 spark 2.2.0
1.PythonRDD.object
這個靜態類是pyspark的一些基礎入口
// 這裏不會把這個類全部內容都介紹,因為大部分都是靜態接口,被pyspark的同名代碼調用 // 這裏介紹幾個主要函數 // 在pyspark的RDD中作為所有action的基礎的collect方法調用的collectAndServer方法也在這個對象中被定義 private[spark] object PythonRDD extends Logging { //被pyspark.SparkContext.runJob調用 //提供rdd.collect的功能,提交job def runJob( sc: SparkContext, rdd: JavaRDD[Array[Byte]], partitions: JArrayList[Int]): Int = { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) serveIterator(flattenedPartition.iterator, s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}") } // 整個pyspark.RDD 的action都是在這個函數中被觸發的 // pyspark.RDD 的collect通過調用這個方法被觸發rdd的執行和任務提交 def collectAndServe[T](rdd: RDD[T]): Int = { //參數rdd 即pyspark中RDD裏的_jrdd, 對應的是scala裏數據源rdd或pythonRDD // 這裏rdd.collect() 觸發了任務開始運行 serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}") } //這個函數的作用是將計算結果寫入到本地socket當中,再在pyspark裏讀取本地socket獲取結果 def serveIterator[T](items: Iterator[T], threadName: String): Int = { // 可以看見socket在本地隨機端口和localhost上建立出來的 val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) // Close the socket if no connection in 3 seconds serverSocket.setSoTimeout(3000) // 這裏啟動一個線程負責將結果寫入到socket中 new Thread(threadName) { setDaemon(true) override def run() { try { val sock = serverSocket.accept() val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) Utils.tryWithSafeFinally { //具體負責寫的是此函數,此函數主要做一些類型和序列化工作 writeIteratorToStream(items, out) } { out.close() } } catch { case NonFatal(e) => logError(s"Error while sending iterator", e) } finally { serverSocket.close() } } }.start() // 最後返回此socket的網絡端口, 這樣pyspark裏就可以通過此端口讀取數據 serverSocket.getLocalPort } // 此函數負責寫入數據結果 // 做一些類型檢查和對應的序列化工作 // PythonRunner中WriterThread寫入數據時使用的也是此函數 def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) { def write(obj: Any): Unit = obj match { case null => dataOut.writeInt(SpecialLengths.NULL) case arr: Array[Byte] => dataOut.writeInt(arr.length) dataOut.write(arr) case str: String => writeUTF(str, dataOut) case stream: PortableDataStream => write(stream.toArray()) case (key, value) => write(key) write(value) case other => throw new SparkException("Unexpected element type " + other.getClass) } iter.foreach(write) } }
pyspark對應的scala代碼PythonRDD對象