TOP N
數據量較少的情況下:
scala> numrdd.sortBy(x=>x,false).take(3)
res17: Array[Int] = Array(100, 99, 98)
scala> numrdd.sortBy(x=>x,true).take(3)
res18: Array[Int] = Array(1, 2, 3)
數據相當大的情況下,當個服務器內存無法完成TOP N,由於數據比較大,spark從hdfs上讀取數據,根據數據本地化的原則,數據根據加載到不同的節點上,我們可以使用mappartition獲取每個分區的top N,然後再次排序獲取整個數據文件的top N
scala> val numrdd=sc.makeRDD(1 to 10000000,20) // 例如有1KW的數字,當然實際中數值可能更大
numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24
scala> numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)}).collect
res2: Array[Int] = Array(5000000, 4999999, 4999998, 4999997, 4999996, 10000000, 9999999, 9999998, 9999997, 9999996, 15000000, 14999999, 14999998, 14999997, 14999996, 20000000, 19999999, 19999998, 19999997, 19999996, 25000000, 24999999, 24999998, 24999997, 24999996, 30000000, 29999999, 29999998, 29999997, 29999996, 35000000, 34999999, 34999998, 34999997, 34999996, 40000000, 39999999, 39999998, 39999997, 39999996, 45000000, 44999999, 44999998, 44999997, 44999996, 50000000, 49999999, 49999998, 49999997, 49999996, 55000000, 54999999, 54999998, 54999997, 54999996, 60000000, 59999999, 59999998, 59999997, 59999996, 65000000, 64999999, 64999998, 64999997, 64999996, 70000000, 69999999, 69999998, 69999997, 69999996, 75000000, 74999999, 74999998, 74999997, 74999996, 80000000, 79999999, 79999998, 7...
scala> val maprdd=numrdd.mapPartitions(x=>{val arr=x.toArray;val aa=arr.sorted;aa.reverseIterator.take(5)})
maprdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:26
scala> maprdd.sortBy(x=>x,false).take(5)
[Stage 3:==========================================> (15 + 5) / 20]18/08/31 18:05:20 WARN spark.HeartbeatReceiver: Removing executor 1 with no recent heartbeats: 166889 ms exceeds timeout 120000 ms 18/08/31 18:05:25 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 50, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms 18/08/31 18:05:31 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 3.0 (TID 53, 192.168.53.122, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 166889 ms [Stage 3:===================================================> (18 + 2) / 20]18/08/31 18:06:19 WARN spark.HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 156368 ms exceeds timeout 120000 ms 18/08/31 18:06:23 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:23 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0 (TID 55, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms 18/08/31 18:06:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 3.0 (TID 52, 192.168.53.122, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 156368 ms [Stage 3:===================================================> (18 + 0) / 20]18/08/31 18:06:32 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. [Stage 3:===================================================> (18 + 2) / 20]18/08/31 18:06:33 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/1 [Stage 4:> (0 + 2) / 20]18/08/31 18:06:44 ERROR scheduler.TaskSchedulerImpl: Lost executor 0 on 192.168.53.122: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. 18/08/31 18:06:45 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/0 [Stage 4:=========================================================(20 + 0) / 20]18/08/31 18:09:42 WARN master.Master: Removing worker-20180831175320-192.168.53.122-55296 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-59602 because we got no heartbeat in 60 seconds 18/08/31 18:09:43 WARN master.Master: Removing worker-20180831175320-192.168.53.122-56119 because we got no heartbeat in 60 seconds 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:44 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 2 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 3 on 192.168.53.122: worker lost 18/08/31 18:09:45 ERROR scheduler.TaskSchedulerImpl: Lost executor 4 on 192.168.53.122: worker lost 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-56119. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-55296. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:45 WARN master.Master: Got heartbeat from unregistered worker worker-20180831175320-192.168.53.122-59602. Asking it to re-register. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@2972d788. 18/08/31 18:09:57 WARN nio.NioEventLoop: Selector.select() returned prematurely 512 times in a row; rebuilding Selector sun.nio.ch.EPollSelectorImpl@6763b7c7. 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/3 18/08/31 18:09:59 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/2 18/08/31 18:10:01 WARN master.Master: Got status update for unknown executor app-20180831175321-0000/4 18/08/31 18:10:19 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 5.0 (TID 94, 192.168.53.122, executor 5): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
) res3: Array[Int] = Array(100000000, 99999999, 99999998, 99999997, 99999996)
由於本機是單機測試,內存不足,重試多次後艱難完成。
TOP N