1. 程式人生 > >pyspark對應的scala代碼PythonRDD對象

pyspark對應的scala代碼PythonRDD對象

pri flat string oge 隨機 list localhost ride 執行

pyspark jvm端的scala代碼PythonRDD

代碼版本為 spark 2.2.0

1.PythonRDD.object

這個靜態類是pyspark的一些基礎入口

// 這裏不會把這個類全部內容都介紹,因為大部分都是靜態接口,被pyspark的同名代碼調用
// 這裏介紹幾個主要函數
// 在pyspark的RDD中作為所有action的基礎的collect方法調用的collectAndServer方法也在這個對象中被定義
private[spark] object PythonRDD extends Logging {
  //被pyspark.SparkContext.runJob調用
  //提供rdd.collect的功能,提交job
  def runJob(
      sc: SparkContext,
      rdd: JavaRDD[Array[Byte]],
      partitions: JArrayList[Int]): Int = {
    type ByteArray = Array[Byte]
    type UnrolledPartition = Array[ByteArray]
    val allPartitions: Array[UnrolledPartition] =
      sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
    val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
    serveIterator(flattenedPartition.iterator,
      s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")
  }

  // 整個pyspark.RDD 的action都是在這個函數中被觸發的
  // pyspark.RDD 的collect通過調用這個方法被觸發rdd的執行和任務提交
  def collectAndServe[T](rdd: RDD[T]): Int = {
    //參數rdd 即pyspark中RDD裏的_jrdd, 對應的是scala裏數據源rdd或pythonRDD
    // 這裏rdd.collect() 觸發了任務開始運行
    serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
  }
  
  //這個函數的作用是將計算結果寫入到本地socket當中,再在pyspark裏讀取本地socket獲取結果
  def serveIterator[T](items: Iterator[T], threadName: String): Int = {
    // 可以看見socket在本地隨機端口和localhost上建立出來的
    val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
    // Close the socket if no connection in 3 seconds
    serverSocket.setSoTimeout(3000)

    // 這裏啟動一個線程負責將結果寫入到socket中
    new Thread(threadName) {
      setDaemon(true)
      override def run() {
        try {
          val sock = serverSocket.accept()
          val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
          Utils.tryWithSafeFinally {
            //具體負責寫的是此函數,此函數主要做一些類型和序列化工作
            writeIteratorToStream(items, out)
          } {
            out.close()
          }
        } catch {
          case NonFatal(e) =>
            logError(s"Error while sending iterator", e)
        } finally {
          serverSocket.close()
        }
      }
    }.start()

    // 最後返回此socket的網絡端口, 這樣pyspark裏就可以通過此端口讀取數據
    serverSocket.getLocalPort
  }

  // 此函數負責寫入數據結果
  // 做一些類型檢查和對應的序列化工作
  // PythonRunner中WriterThread寫入數據時使用的也是此函數
  def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {

    def write(obj: Any): Unit = obj match {
      case null =>
        dataOut.writeInt(SpecialLengths.NULL)
      case arr: Array[Byte] =>
        dataOut.writeInt(arr.length)
        dataOut.write(arr)
      case str: String =>
        writeUTF(str, dataOut)
      case stream: PortableDataStream =>
        write(stream.toArray())
      case (key, value) =>
        write(key)
        write(value)
      case other =>
        throw new SparkException("Unexpected element type " + other.getClass)
    }

    iter.foreach(write)
  }


}

pyspark對應的scala代碼PythonRDD對象