1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_filter運算元

第五章_Spark核心程式設計_Rdd_filter運算元

1. 定義

    /*
    * 1. 定義
    *     def filter(f: T => Boolean): RDD[T]
    *
    * 2. 功能
    *     根據 傳輸函式 對Rdd元素進行過濾,剔除不符合條件的元素
    *
    * 3. note
    *     1. 當資料進行篩選過濾後,分割槽不變,但是分割槽內的資料可能不均衡,生產環境下,可能會出 現資料傾斜
    * */
  object filterTest extends App {
    /*
    * 1. 定義
    *     def filter(f: T => Boolean): RDD[T]
    *
    * 2. 功能
    *     根據 傳輸函式 對Rdd元素進行過濾,剔除不符合條件的元素
    *
    * 3. note
    *     1. 當資料進行篩選過濾後,分割槽不變,但是分割槽內的資料可能不均衡,生產環境下,可能會出 現資料傾斜
    * 
*/ val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd = sc.makeRDD(List(-1, -2, -13, 14, 3, 4, 6, 1, 10, 100), 2) //過濾 Rdd中的負數 private val rdd1: RDD[Int] = rdd.filter(_ > 0) println(s
"當前分割槽數 : ${rdd1.getNumPartitions}") println(rdd1.collect().mkString(",")) sc.stop() /* * 執行結果 * 當前分割槽數 : 2 14,3,4,6,1,10,100 * * */ }

需求 : 從伺服器日誌資料apache.log中獲取2015年5月17日的請求路徑

  object filterTest1 extends App {
    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("groupByTest")

    val sc: SparkContext 
= new SparkContext(sparkconf) val rdd = sc.textFile("Spark_319/src/data/input/apache.log") // 獲取 2015年5月17日 日誌記錄 private val rdd1: RDD[String] = rdd.filter( _.split(" ")(3).split(":")(0) == "17/05/2015" ) // 獲取路徑 private val rdd2: RDD[String] = rdd1.map(_.split(" ").reverse(0)) println(s"當前分割槽數 : ${rdd2.getNumPartitions}") rdd2.collect().foreach(println(_)) sc.stop() }