1. 程式人生 > >取數據超過內存限制的問題-解決方案(sample,takeSample,filter)

取數據超過內存限制的問題-解決方案(sample,takeSample,filter)

cau red 所有 cep 調用 ext log 源碼 rsa

遇到的問題

在處理數據過程中,遇到需要取(n)個數的問題,而當樣本量過大的時候,就不能簡單的take(n),這類問題一般有兩種情況:
> - 有序取 TopN
> - 無序取 N


先來討論無序取N的情況:

  • sample函數
    • sample(boolean, fraction,seed) : 按比例抽取
    • 返回一個新的RDD

      withReplacement:元素可以多次抽樣(在抽樣時替換)
      • withReplacement=true,表示有放回的抽樣
      • withReplacement=false,表示無放回的抽樣

       
      fraction:期望樣本的大小作為RDD大小的一部分, 當withReplacement=false時:選擇每個元素的概率;分數一定是[0,1] ; 當withReplacement=true時:選擇每個元素的期望次數; 分數必須大於等於0
      seed

      :隨機數生成器的種子

圖 11中 的 每 個 方 框 是 一 個 RDD 分 區。 通 過 sample 函 數, 采 樣 50% 的 數 據。V1、 V2、 U1、 U2、U3、U4 采樣出數據 V1 和 U1、 U2 形成新的 RDD
技術分享圖片

圖 sample 算子對 RDD 轉換

  • takeSample函數
    • takeSample(boolean, sampleNum,seed) : 按固定數量抽取
    • 返回一個Array[T]; 該方法僅在預期結果數組很小的情況下使用,因為所有數據都被加載到driver的內存中
    • takeSample函數先是計算fraction,也就是采樣比例,然後調用sample函數進行采樣,並對采樣後的數據進行collect(),最後調用take函數返回num個元素

withReplacement:元素可以多次抽樣(在抽樣時替換)

  • withReplacement=true,表示有放回的抽樣
  • withReplacement=false,表示無放回的抽樣

num:返回的樣本的大小
seed:隨機數生成器的種子

技術分享圖片

圖 takeSample算子對RDD轉換


再來看一下有序取 TopN的情況:

  • filter函數
    • 函數功能是對元素進行過濾,對每個元素應用 f 函 數,返回值為 true 的元素在RDD中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 FilteredRDD(this,sc.clean(f))
    • 若是單列無法過濾,可以手動設置過濾位
    • 有點遺憾的是無法準確的取固定量的數

 圖中每個方框代表一個 RDD 分區, T 可以是任意的類型。通過用戶自定義的過濾函數 f,對每個數據項操作,將滿足條件、返回結果為 true 的數據項保留。例如,過濾掉 V2 和 V3 保留了 V1,為區分命名為 V‘1。
技術分享圖片

  圖 filter 算子對 RDD 轉換


附:takeSample源碼

def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong): Array[T] =
    {
        val numStDev = 10.0
        if (num < 0) {
            throw new IllegalArgumentException("Negative number of elements requested")
        } else if (num == 0) {
            return new Array[T](0)
        }
        val initialCount = this.count()
        if (initialCount == 0) {
            return new Array[T](0)
        }
        val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
        if (num > maxSampleSize) {
            throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - "         + s"$numStDev * math.sqrt(Int.MaxValue)")
        }
        val rand = new Random(seed)
        if (!withReplacement && num >= initialCount) {
            return Utils.randomizeInPlace(this.collect(), rand)
        }
        val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,        withReplacement)
        var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
        // If the first sample didn‘t turn out large enough, keep trying to take samples;
        // this shouldn‘t happen often because we use a big multiplier for the initial size
        var numIters = 0
        while (samples.length < num) {
            logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
            samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
            numIters += 1
        }
        Utils.randomizeInPlace(samples, rand).take(num)
}

取數據超過內存限制的問題-解決方案(sample,takeSample,filter)