1. 程式人生 > 實用技巧 >spark zip && zipPartitions && zipWithIndex && zipWithUniqueId

spark zip && zipPartitions && zipWithIndex && zipWithUniqueId

zip transformation運算元,將兩個RDD中的元素(KV格式/非KV格式)變成一個KV格式的RDD,兩個RDD的每個分割槽元素個數必須相同。

spark.sparkContext.setLogLevel("error")
 spark.sparkContext.setLogLevel("error")
    val kzc=spark.sparkContext.parallelize(1.to(10),2)
    val bd=spark.sparkContext.parallelize(List("a","b","c","d","e","f","g","h","i","j"),2
) kzc.zip(bd).collect().foreach(println(_))

zipPartitions

 spark.sparkContext.setLogLevel("error")
    val kzc=spark.sparkContext.parallelize(1.to(10),2)
    val bd=spark.sparkContext.parallelize(List("a","b","c","d","e","f"),2)
    val res=kzc.zipPartitions(bd){
      (iterator1,iterator2)=>{
        val result
=new scala.collection.mutable.ListBuffer[String]() while(iterator1.hasNext && iterator2.hasNext){ result.append(iterator1.next()+"|"+iterator2.next()) } result.iterator } } res.collect().foreach(println(_))

zipWithIndex該函式將RDD中的元素和這個元素在RDD中的索引號(從0開始)組合成(K,V)對。

spark.sparkContext.setLogLevel("error")
    val kzc=spark.sparkContext.parallelize(1.to(10),2)
    val bd=spark.sparkContext.parallelize(List("a","b","c","d","e","f"),2)
    bd.zipWithIndex().collect().foreach(println(_))