1. 程式人生 > >spark pairRDD操作

spark pairRDD操作

一、建立pairRDD的方法
①python指令碼,使用 map() 函式 示例把句子的第一個單詞作為鍵,句子作為值:
>>> line=sc.parallelize(["hello world","very good","yes right"]) >>> map = line.map(lambda s:((s.split(" "))[0],s)) >>> map.collect() [('hello', 'hello world'), ('very', 'very good'), ('yes', 'yes right')] >>>
可以看到python中的轉化非常方便,直接返回一個二元組即可。
②使用java, mapToPair( ) 
函式,傳入一個PairFunction物件引數
//使用mapToPair轉化為鍵值對,例如: (world,1) JavaPairRDD<String, Integer> pair = words.mapToPair(new PairFunction<String, String, Integer>() {     public Tuple2<String, Integer> call(String s) throws Exception {         return new Tuple2<String, Integer>(s,1);     } });
在java中,鍵值對RDD的型別為  JavaPairRDD<T1,T2>

③從記憶體中建立pairRDD python和scala只要使用 parallelize()函式傳入鍵值對形式的引數。 python指令碼示例: >>> pair=sc.parallelize([("hello",1),("good",2),("yes",3)]) >>> pair.collect() [('hello', 1), ('good', 2), ('yes', 3)] >>>
java中從記憶體建立pairRDD需要使用 parallelizePairs()
函式 示例: public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local").setAppName("test07");     JavaSparkContext sc = new JavaSparkContext(conf);     List<Tuple2<String,String>> list = new ArrayList<Tuple2<String,String>>();     list.add(new Tuple2<String, String>("hello","hello world"));     list.add(new Tuple2<String, String>("good","good job"));     list.add(new Tuple2<String, String>("hi","hi world"));     JavaPairRDD<String, String> pair = sc.parallelizePairs(list);     System.out.println(pair.collect());     sc.stop(); }

二、pairRDD轉化操作filter()函式同樣可以適用於鍵值對RDD python示例如下: >>> pair.collect() [(1, 2), (2, 3), (1, 4), (3, 4), (3, 5), (5, 5)] >>> pair2=pair.filter(lambda value:value[1]<5) >>> pair2.collect() [(1, 2), (2, 3), (1, 4), (3, 4)] >>>
java示例如下: public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local").setAppName("test08");     JavaSparkContext sc = new JavaSparkContext(conf);     //插入資料     List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();     list.add(new Tuple2<Integer, Integer>(1, 2));     list.add(new Tuple2<Integer, Integer>(2, 4));     list.add(new Tuple2<Integer, Integer>(3, 5));     JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);     //filter篩選     JavaPairRDD<Integer, Integer> filter = pair.filter(new Function<Tuple2<Integer, Integer>, Boolean>() {         public Boolean call(Tuple2<Integer, Integer> t) throws Exception {             return t._2 < 5;         }     });     System.out.println(filter.collect());
    sc.stop(); }
mapValues() 函式可以只對value做對映轉化操作 python指令碼示例: >>> pair.collect() [(1, 2), (2, 3), (1, 4), (3, 4), (3, 5), (5, 5)] >>> pair2=pair.mapValues(lambda v:v+1) >>> pair2.collect() [(1, 3), (2, 4), (1, 5), (3, 5), (3, 6), (5, 6)] >>>
java程式示例: public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local").setAppName("test08");     JavaSparkContext sc = new JavaSparkContext(conf);     //插入資料     List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();     list.add(new Tuple2<Integer, Integer>(1, 2));     list.add(new Tuple2<Integer, Integer>(2, 4));     list.add(new Tuple2<Integer, Integer>(3, 5));     JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);     JavaPairRDD<Integer, Object> result = pair.mapValues(new Function<Integer, Object>() {         public Object call(Integer integer) throws Exception {             return integer*integer;         }     });     System.out.println(result.collect());     sc.stop(); }
③聚合操作 * reduceByKey() 函式可以根據鍵的值做聚合,即把具有相同鍵值的二元組聚合起來。 python shell示例: >>> pair=sc.parallelize([(1,2),(2,3),(1,4),(2,5),(1,9),(3,5),(2,5),(3,6)]) >>> pair2=pair.reduceByKey(lambda x,y:x+y) >>> pair2.collect() [Stage 0:>                                                          (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 1:>                                                          (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [(2, 13), (1, 15), (3, 11)] >>>
java程式示例: public static void main(String[] args) {     SparkConf conf = new SparkConf().setMaster("local").setAppName("test10");     JavaSparkContext sc = new JavaSparkContext(conf);     //插入資料     List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();     list.add(new Tuple2<Integer, Integer>(1, 2));     list.add(new Tuple2<Integer, Integer>(1, 3));     list.add(new Tuple2<Integer, Integer>(2, 4));     list.add(new Tuple2<Integer, Integer>(3, 5));     list.add(new Tuple2<Integer, Integer>(2, 1));     JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);     JavaPairRDD<Integer, Integer> result = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {         public Integer call(Integer integer, Integer integer2) throws Exception {             return integer + integer2;         }     });     System.out.println(result.collect());     sc.stop(); }
* countByValue() 函式可以直接對RDD進行統計,返回一個鍵值對,鍵為資料,值為出現次數,例如在單詞計數中可以用到。 python示例: >>> line=sc.textFile("D:\spark\README.md") >>> words=line.flatMap(lambda x : x.split(" ")) >>> res=words.countByValue() >>> res defaultdict(<class 'int'>, {'': 67, 'guide,': 1, 'APIs': 1, 'For': 2, 'name': 1, 'mesos://': 1, 'for': 11, 'tools': 1, 'shell:': 2, 'Pi': 1, 'find': 1, 'sc.parallelize(1': 1, 'Configuration': 1, 'computing': 1, 'start': 1, 'return': 2, 'see': 1, 'directory.': 1, 'Hadoop,': 2, 'rich': 1, 'sc.parallelize(range(1000)).count()': 1, 'graph': 1, 'you': 4, '["Specifying': 1, 'using:': 1, 'package': 1, 'Building': 1, 'how': 2, 'built,': 1, 'several': 1, 'A': 1, 'The': 1, 'documentation,': 1, '"local"': 1, 'module,': 1, 'programming': 1, 'threads.': 1, 'Online': 1, 'scala>': 1, 'a': 8, 'optimized': 1, 'online': 1, 'package.)': 1, './bin/spark-shell': 1, 'distributions.': 1, 'with': 3, '[Configuration': 1, '[project': 2, 'uses': 1, '1000:': 2, '[building': 1, 'version': 1, 'README': 1, 'MASTER=spark://host:7077': 1, 'help': 1, 'file': 1, 'be': 2, 'locally.': 1, 'learning,': 1, 'clean': 1, 'instructions.': 1, '[Apache': 1, 'running': 1, 'build': 3, 'Shell': 2, 'examples': 2, 'MLlib': 1, 'that': 2, 'Maven](http://maven.apache.org/).': 1, 'latest': 1, 'to': 14, 'abbreviated': 1, 'not': 1, 'Spark.': 1, 'Documentation': 1, 'library': 1, 'general': 2, 'fast': 1, 'high-level': 1, 'build/mvn': 1, 'YARN,': 1, 'one': 2, 'this': 1, 'built': 1, 'individual': 1, '(You': 1, 'system': 1, 'Hadoop': 3, '`./bin/run-example': 1, 'wiki](https://cwiki.apache.org/confluence/display/SPARK).': 1, 'easiest': 1, 'tests': 2, 'systems.': 1, 'setup': 1, 'way': 1, 'machine': 1, 'prefer': 1, 'Scala': 2, 'To': 2, '1000).count()': 1, 'which': 2, '>>>': 1, '-DskipTests': 1, 'variable': 1, 'Note': 1, 'Python,': 2, 'Spark"](http://spark.apache.org/docs/latest/building-spark.html).': 1, 'other': 1, 'use': 3, 'storage': 1, 'Streaming': 1, './dev/run-tests': 1, 'Example': 1, 'guidance': 2, '[run': 1, 'set': 2, 'at': 2, 'will': 1, 'documentation': 3, 'Interactive': 2, 'from': 1, 'processing,': 1, 'including': 3, 'talk': 1, 'computation': 1, 'Big': 1, 'And': 1, 'thread,': 1, 'different': 1, 'building': 2, 'can': 6, 'using': 2, 'Tests': 1, 'print': 1, 'run:': 1, 'Scala,': 1, 'stream': 1, 'particular': 2, 'its': 1, 'your': 1, 'must': 1, 'the': 21, 'params': 1, 'Running': 1, 'programs': 2, '"local[N]"': 1, 'changed': 1, 'first': 1, 'following': 2, 'runs.': 1, 'Please': 3, 'SQL': 2, 'You': 3, 'core': 1, 'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).': 1, 'configure': 1, 'It': 2, 'environment': 1, 'Hive': 2, 'HDFS': 1, 'Testing': 1, 'processing.': 1, 'locally': 2, 'Programs': 1, 'have': 1, 'and': 10, 'Once': 1, 'or': 3, 'higher-level': 1, 'only': 1, 'detailed': 2, 'when': 1, 'Python': 2, 'cluster.': 1, 'instance:': 1, 'also': 4, 'in': 5, 'if': 4, 'same': 1, 'downloaded': 1, 'Versions': 1, 'Alternatively,': 1, './bin/run-example': 2, 'project': 1, 'page](http://spark.apache.org/documentation.html)': 1, '[params]`.': 1, 'Try': 1, 'supports': 2, 'package.': 1, 'About': 1, 'should': 2, 'sample': 1, 'need': 1, 'command,': 2, 'example': 3, 'are': 1, 'MASTER': 1, 'no': 1, 'run': 7, 'GraphX': 1, '["Building': 1, 'DataFrames,': 1, 'distribution': 1, '#': 1, './bin/pyspark': 1, 'engine': 1, 'usage': 1, 'example:': 1, 'analysis.': 1, 'of': 5, 'provides': 1, 'available': 1, 'them,': 1, 'comes': 1, 'graphs': 1, 'Hadoop-supported': 1, 'Many': 1, 'Because': 1, 'Spark](#building-spark).': 1, 'basic': 1, 'data': 1, 'do': 2, 'given.': 1, 'on': 5, 'an': 3, 'versions': 1, 'Guide](http://spark.apache.org/docs/latest/configuration.html)': 1, 'Data.': 1, 'Apache': 1, 'Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)': 1, 'spark://': 1, 'requires': 1, 'class': 2, 'programs,': 1, '##': 8, 'R,': 1, 'refer': 2, 'URL,': 1, 'Java,': 1, '`examples`': 2, '<http://spark.apache.org/>': 1, 'site,': 1, 'contains': 1, 'More': 1, 'through': 1, 'pre-built': 1, 'This': 2, 'against': 1, '"yarn"': 1, 'Spark': 13, 'cluster': 2, 'submit': 1, 'is': 6, 'N': 1, 'Thriftserver': 1, 'web': 1, 'protocols': 1, '<class>': 1, 'overview': 1, 'SparkPi': 2}) >>>
java示例: SparkConf conf = new SparkConf().setMaster("local").setAppName("test06"); JavaSparkContext sc = new JavaSparkContext(conf); //插入測試資料 JavaRDD<String> sentance = sc.parallelize(Arrays         .asList("hello world hi this a", "good yes hello", "world a", "hi very good")); //使用flatMap轉化為單詞 JavaRDD<String> words = sentance.flatMap(new FlatMapFunction<String, String>() {     public Iterable<String> call(String s) throws Exception {         return Arrays.asList(s.split(" "));     } }); Map<String, Long> res = words.countByValue(); System.out.println(res);

* combineByKey() 函式,類似於普通RDD中的聚合函式aggregate(),需要傳入三個引數:第一次遇到某個鍵的初始值設定,再次遇到某個鍵時的聚合函式,多個分片聚合的規則函式 python指令碼示例: >>> pair=sc.parallelize([(1,2),(2,3),(1,3),(2,5),(1,6),(3,5),(5,5),(2,1)])  #輸入資料 >>> com_pair=pair.combineByKey(lambda x:(x,1),lambda x,v:(x[0]+v,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])) #使用combineByKey聚合成為(值,數目)的形式 >>> com_pair.collect() [(2, (9, 3)), (1, (11, 3)), (3, (5, 1)), (5, (5, 1))] >>> res=com_pair.mapValues(lambda x:x[0]/x[1])  #轉化值,計算平均值 >>> res.collect() [(2, 3.0), (1, 3.6666666666666665), (3, 5.0), (5, 5.0)] >>>
java示例: public class TestCombineByKey {
    public static void main(String[] args) {         SparkConf conf = new SparkConf().setMaster("local").setAppName("test10");         JavaSparkContext sc = new JavaSparkContext(conf);         //插入資料         List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();         list.add(new Tuple2<Integer, Integer>(1, 2));         list.add(new Tuple2<Integer, Integer>(1, 3));         list.add(new Tuple2<Integer, Integer>(2, 4));         list.add(new Tuple2<Integer, Integer>(3, 5));         list.add(new Tuple2<Integer, Integer>(2, 1));         JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);         /**          * 當想要按鍵的值計算平均數的時候,reduceByMap()函式就不太好用了,這時候需要使用combineByKey函式          */         JavaPairRDD<Integer, Tuple2<Integer, Integer>> com_pair = pair.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {  //這個函式用於第一次遇到某個鍵的時候初始化(總值,數目)鍵值對             public Tuple2<Integer, Integer> call(Integer integer) throws Exception {                 return new Tuple2<Integer, Integer>(integer, 1);             }         }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { //這個函式用於非首次遇到某個鍵的時候的合併操作             public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t, Integer integer) throws Exception {                 return new Tuple2<Integer, Integer>(t._1 + integer, t._2 + 1);             }         }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {             public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2) throws Exception {                 return new Tuple2<Integer, Integer>(t1._1 + t2._1, t1._2 + t2._2);             }         });
        JavaPairRDD<Integer, Double> result = com_pair.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {             public Double call(Tuple2<Integer, Integer> t) throws Exception {                 return (double) t._1 / t._2;             }         });
        System.out.println(result.collect());
        sc.stop();     } }
這裡我是用了Tuple2作為儲存(某個鍵總值,數目)的鍵值對,也可以使用自定義的類來做,不過注意:一定要實現序列化介面。 其實這裡求同一鍵值對應的平均數,使用map配合reduceByKey也可以實現,不過這裡使用combineByKey將所有操作都集中到了一個函式的引數中。
在 Spark 中使用這些專用的聚合函式, 始終要比手動將資料分組再歸約快很多。
* groupByKey() 函式可以將鍵值相同的元素的鍵合併,值形成陣列形式: java示例:
public class TestGroupByKey {
    public static void main(String[] args) {         SparkConf conf = new SparkConf().setMaster("local").setAppName("test11");         JavaSparkContext sc = new JavaSparkContext(conf);         //插入資料         List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>();         list.add(new Tuple2<Integer, Integer>(1, 2));         list.add(new Tuple2<Integer, Integer>(1, 3));         list.add(new Tuple2<Integer, Integer>(2, 4));         list.add(new Tuple2<Integer, Integer>(3, 5));         list.add(new Tuple2<Integer, Integer>(2, 1));         JavaPairRDD<Integer, Integer> pair = sc.parallelizePairs(list);         JavaPairRDD<Integer, Iterable<Integer>> after_group = pair.groupByKey();         System.out.println(after_group.collect());
        sc.stop();     } }
/* [(1,[2, 3]), (3,[5]), (2,[4, 1])]  */

* keys()values() 函式 python shell示例:
>>> pair.collect() [(1, 2), (2, 3), (3, 4), (1, 3), (2, 5), (2, 6), (1, 2), (3, 9)] >>> pair.keys() PythonRDD[18] at RDD at PythonRDD.scala:43 >>> pair.keys().collect() [1, 2, 3, 1, 2, 2, 1, 3] >>> pair.values().collect() [2, 3, 4, 3, 5, 6, 2, 9] >>>
* sortByKey() 按照鍵的大小排序,如果不傳入排序方式則按照預設的方式排序(我猜是ascii碼值從小到大排序,待驗證) python shell 示例: >>> pair.sortByKey().collect() [Stage 15:>                                                         (0 + 2) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [Stage 15:=============================>                            (1 + 1) / 2]D:\spark\python\lib\pyspark.zip\pyspark\shuffle.py:58: UserWarning: Please install psutil to have better support with spilling [(1, 2), (1, 3), (1, 2), (2, 3), (2, 5), (2, 6), (3, 4), (3, 9)] >>>