1. 程式人生 > >正確提交spark到yarn的demo

正確提交spark到yarn的demo

    通過Spark-submit在xshell提交命令列,如果叢集配置了keberos的話需要在打包的jar中進行認證,認證檔案上傳到節點並且需要分發到每一個節點,節點之間需要無密碼ssh登入。

    因為是通過Spark-submit提交程式,所以在程式碼當中的SparkConf設定為

.setMaster("yarn-cluster")

如果提交顯示classnotfound可能是當前使用者沒有許可權操作打包在叢集上的jar包,或者是缺少命令--master yarn-cluster 。在這裡,--master yarn-cluster 要和.setMaster("yarn-cluster")一致,不然會導致節點之間Connection的異常,而xshell一直顯示Accepted。

以下是spark-submit提交的命令:

 spark-submit  \
 --class sparkDemo.WordCount \
 --master yarn-cluster \
 --num-executors 5 \
 --driver-memory 5g \
 --driver-cores 4 \
 --executor-memory 10g \
 --executor-cores 5 \
  hdfs://1.2.3.4:8020/bulkload/Jars/sub/SparkDemo.jar

以下是需要打包的類:

package sparkDemo;

import java.util.Arrays;

import kerberos.KerberosService;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import common.HadoopUtil;

public class ParseAttachment {
	
    public static void main(String[] args) {
    	
		
		SparkConf conf_ = new SparkConf()
				.setMaster("yarn-cluster")
				.setAppName("parseAttachment");

		JavaSparkContext sc = new JavaSparkContext(conf_);
		JavaRDD<String> text = sc.textFile(HadoopUtil.hdfs_url + "/bulkload/Spark_in");

    	System.out.println("ok");
    	
    	JavaRDD<String> words = text.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));//把字串轉化成list
            }
        });
        
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        
        JavaPairRDD<String, Integer> results = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {            
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer value1, Integer value2) throws Exception {
                // TODO Auto-generated method stub
                return value1 + value2;
            }
        });
        
        JavaPairRDD<Integer, String> temp = results.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)
                    throws Exception {
                return new Tuple2<Integer, String>(tuple._2, tuple._1);
            }
        });
        
        JavaPairRDD<String, Integer> sorted = temp.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
                    throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(tuple._2,tuple._1);
            }
        });
        
        sorted.foreach(new VoidFunction<Tuple2<String,Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Integer> tuple) throws Exception {
                System.out.println("word:" + tuple._1 + " count:" + tuple._2);
            }
        });
        
        sc.close();
    }
}
至於spark的依賴包:spark-assembly-1.5.2-hadoop2.6.0.jar 導進去就可以了。