正確提交spark到yarn的demo
阿新 • • 發佈:2018-12-22
通過Spark-submit在xshell提交命令列,如果叢集配置了keberos的話需要在打包的jar中進行認證,認證檔案上傳到節點並且需要分發到每一個節點,節點之間需要無密碼ssh登入。
因為是通過Spark-submit提交程式,所以在程式碼當中的SparkConf設定為
.setMaster("yarn-cluster")
如果提交顯示classnotfound可能是當前使用者沒有許可權操作打包在叢集上的jar包,或者是缺少命令--master yarn-cluster 。在這裡,--master yarn-cluster 要和.setMaster("yarn-cluster")一致,不然會導致節點之間Connection的異常,而xshell一直顯示Accepted。
以下是spark-submit提交的命令:
spark-submit \
--class sparkDemo.WordCount \
--master yarn-cluster \
--num-executors 5 \
--driver-memory 5g \
--driver-cores 4 \
--executor-memory 10g \
--executor-cores 5 \
hdfs://1.2.3.4:8020/bulkload/Jars/sub/SparkDemo.jar
以下是需要打包的類:
至於spark的依賴包:spark-assembly-1.5.2-hadoop2.6.0.jar 導進去就可以了。package sparkDemo; import java.util.Arrays; import kerberos.KerberosService; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import common.HadoopUtil; public class ParseAttachment { public static void main(String[] args) { SparkConf conf_ = new SparkConf() .setMaster("yarn-cluster") .setAppName("parseAttachment"); JavaSparkContext sc = new JavaSparkContext(conf_); JavaRDD<String> text = sc.textFile(HadoopUtil.hdfs_url + "/bulkload/Spark_in"); System.out.println("ok"); JavaRDD<String> words = text.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" "));//把字串轉化成list } }); JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }); JavaPairRDD<String, Integer> results = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer value1, Integer value2) throws Exception { // TODO Auto-generated method stub return value1 + value2; } }); JavaPairRDD<Integer, String> temp = results.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2, tuple._1); } }); JavaPairRDD<String, Integer> sorted = temp.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(tuple._2,tuple._1); } }); sorted.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("word:" + tuple._1 + " count:" + tuple._2); } }); sc.close(); } }