Spark學習——RDD基本操作
阿新 • • 發佈:2019-02-02
Spark操作分為transformation和action,現將常用的記錄在此:
//transformation //將元素一個一個轉換 JavaRDD<String> map = raw.map(new Function<String, String>() { @Override public String call(String string) throws java.lang.Exception { return string+"!"; } }); //返回結果依true、false結果來定 JavaRDD<String> filter = raw.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws java.lang.Exception { return s.length()>11; } }); //將每一個元素先進行內部處理,再講所有部分經過處理的元素再合併 JavaRDD<String> flatmap = raw.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws java.lang.Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。 //如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多。例如在寫入資料庫 //時,map會為每一個partition建立一個connection,開銷大;而用mapPartitions則會為每個partition //建立一個connection JavaRDD<String> mapPartitions = raw.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { @Override public Iterator<String> call(Iterator<String> stringIterator) throws java.lang.Exception { int i = 0; List<String> cur = new ArrayList<String>(); while(stringIterator.hasNext()){ cur.add(stringIterator.next()+i); i++; } return cur.iterator(); } }); //mapPartitionsWithIndex與mapPartition基本相同,只是在處理函式的引數是一個二元元組,元組的第一 // 個元素是當前處理的分割槽的index,元組的第二個元素是當前處理的分割槽元素組成的Iterator JavaRDD<String> mapPartitionsWithIndex = raw.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() { @Override public Iterator<String> call(Integer v1, Iterator<String> v2) throws java.lang.Exception { List<String> cur = new ArrayList<String>(); while(v2.hasNext()){ cur.add(v2.next()+v1); } return cur.iterator(); } },true); //聯結兩個RDD JavaRDD<String> union = map.union(mapPartitions); //求兩個RDD的交集 JavaRDD<String> intersection = raw.intersection(map); //去重 JavaRDD<String> distinct = raw.distinct(); //轉為pair JavaPairRDD<Integer,String> mapToPair = raw.mapToPair(new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String s) throws java.lang.Exception { if(s.length() < 11){ return new Tuple2<Integer, String>(1,s); }else{ return new Tuple2<Integer, String>(2,s); } } }); //按照key來歸併,返回的RDD中value為Iterable型別的 JavaPairRDD<Integer,Iterable<String>> groupByKey = mapToPair.groupByKey(); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } //按照key來進行聚合 JavaPairRDD<Integer,String> reduceByKey = mapToPair.reduceByKey(new Function2<String, String, String>() { @Override public String call(String s, String s2) throws java.lang.Exception { return s + " " + s2; } }); //按照key來聚合,首先第一個方法在當前分段內部完成按key聚合,然後再將所有分段按key進行合併 JavaPairRDD<Integer,String> aggregateByKey = mapToPair.aggregateByKey("test", new Function2<String, String, String>() { @Override public String call(String s, String s2) throws java.lang.Exception { System.out.println("1:"+s+" "+s2); return s+s2; } }, new Function2<String, String, String>() { @Override public String call(String s, String s2) throws java.lang.Exception { System.out.println("2:"+s+" "+s2); return s+s2; } }); //按key排序 JavaPairRDD<Integer,String> sortByKey = mapToPair.sortByKey(); //聯結兩個RDD,放入一個二元tuple JavaPairRDD<Integer,Tuple2<String,String>> join = mapToPair.join(reduceByKey); //action: //以陣列的形式返回一個數據集 List<String> res = distinct.collect(); //統計數量 long num = mapPartitions.count(); //返回第一個元素 String first = map.first(); //take(n)返回前n個元素 List<String> n = map.take(3); //隨機抽樣,返回一個包含 num 個隨機抽樣(random sample)元素的陣列,引數 withReplacement //指定是否有放回抽樣,引數 seed 指定生成隨機數的種子 List<String> sample = map.takeSample(true, 3, 7); //saveAsTextFile(path):將資料集中的元素以文字檔案(或文字檔案集合)的形式寫入本地檔案系統、 // HDFS 或其它 Hadoop 支援的檔案系統中的給定目錄中。Spark 將對每個元素呼叫 toString 方法, // 將資料元素轉換為文字檔案中的一行記錄。 //saveAsSequenceFile(path):將資料集中的元素以 Hadoop SequenceFile 的形式寫入到本地檔案系統、 // HDFS 或其它 Hadoop 支援的檔案系統指定的路徑中。該操作可以在實現了 Hadoop 的 Writable 介面 // 的鍵值對(key-value pairs)的 RDD 上使用。在 Scala 中,它還可以隱式轉換為 Writable 的型別 //saveAsObjectFile(path) :使用 Java 序列化(serialization)以簡單的格式(simple format) //編寫資料集的元素,然後使用 SparkContext.objectFile() 進行載入。 //僅適用於(K,V)型別的 RDD 。返回具有每個 key 的計數的 (K , Int)對 的 hashmap。 Map<Integer, Long> resultMap = mapToPair.countByKey(); //遍歷 map.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s); } });