1. 程式人生 > >Java Web提交任務到Spark Spark通過Java Web提交任務

Java Web提交任務到Spark Spark通過Java Web提交任務

相關軟體版本:

Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7

機器:

windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);

centos6.6虛擬機器(Hadoop偽分散式叢集,Spark standAlone叢集,JDK1.8);

centos7虛擬機器(Tomcat,JDK1.8);

1.場景

1. windows簡單Java程式呼叫Spark,執行Scala開發的Spark程式,這裡包含兩種模式:

    1> 提交任務到Spark叢集,使用standAlone模式執行;

    2> 提交任務到Yarn叢集,使用yarn-client的模式;

2. windows 開發Java web程式呼叫Spark,執行Scala開發的Spark程式,同樣包含兩種模式,參考1.

3. Linux執行java web程式呼叫Spark,執行Scala開發的Spark程式,包含兩種模式,參考1.

2.實現

1. 簡單Scala程式,該程式的功能是讀取HDFS中的log日誌檔案,過濾log檔案中的WARN和ERROR的記錄,最後把過濾後的記錄寫入到HDFS中,程式碼如下:

  1. import org.apache.spark.{SparkConf, SparkContext}  
  2. /** 
  3.  * Created by Administrator on 2015/8/23.
     
  4.  */
  5. object Scala_Test {  
  6.   def main(args:Array[String]): Unit ={  
  7.     if(args.length!=2){  
  8.       System.err.println("Usage:Scala_Test <input> <output>")  
  9.     }  
  10.     // 初始化SparkConf
  11.     val conf = new SparkConf().setAppName("Scala filter")  
  12.     val sc = new SparkContext(conf)  
  13.     //  讀入資料
  14.     val lines = sc.textFile(args(0
    ))  
  15.     // 轉換
  16.     val errorsRDD = lines.filter(line => line.contains("ERROR"))  
  17.     val warningsRDD = lines.filter(line => line.contains("WARN"))  
  18.     val  badLinesRDD = errorsRDD.union(warningsRDD)  
  19.     // 寫入資料
  20.     badLinesRDD.saveAsTextFile(args(1))  
  21.     // 關閉SparkConf
  22.     sc.stop()  
  23.   }  
  24. }  
使用IntelliJ IDEA 並打成jar包備用(lz這裡命名為spark_filter.jar);

2.  java呼叫spark_filter.jar中的Scala_Test 檔案,並採用Spark standAlone模式,java程式碼如下:

  1. package test;  
  2. import java.text.SimpleDateFormat;  
  3. import java.util.Date;  
  4. import org.apache.spark.deploy.SparkSubmit;  
  5. /** 
  6.  * @author fansy 
  7.  * 
  8.  */
  9. publicclass SubmitScalaJobToSpark {  
  10.     publicstaticvoid main(String[] args) {  
  11.         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");   
  12.         String filename = dateFormat.format(new Date());  
  13.         String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();  
  14.         tmp =tmp.substring(0, tmp.length()-8);  
  15.         String[] arg0=new String[]{  
  16.                 "--master","spark://node101:7077",  
  17.                 "--deploy-mode","client",  
  18.                 "--name","test java submit job to spark",  
  19.                 "--class","Scala_Test",  
  20.                 "--executor-memory","1G",  
  21. //              "spark_filter.jar",
  22.                 tmp+"lib/spark_filter.jar",//
  23.                 "hdfs://node101:8020/user/root/log.txt",  
  24.                 "hdfs://node101:8020/user/root/badLines_spark_"+filename  
  25.         };  
  26.         SparkSubmit.main(arg0);  
  27.     }  
  28. }  

具體操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(該檔案在Spark壓縮檔案的lib目錄中,同時該檔案較大,拷貝需要一定時間) 拷貝到WebRoot/WEB-INF/lib目錄。(注意:這裡可以直接建立java web專案,在測試java呼叫時,直接執行java程式碼即可,在測試web專案時,開啟tomcat即可)

java呼叫spark_filter.jar中的Scala_Test 檔案,並採用Yarn模式。採用Yarn模式,不能使用簡單的修改master為“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的時候,使用這個,同時配置HADOOP_CONF_DIR路徑是可以的,但是在這裡,讀取不到HADOOP的配置,所以這裡採用其他方式,使用yarn.Clent提交的方式,java程式碼如下:

  1. package test;  
  2. import java.text.SimpleDateFormat;  
  3. import java.util.Date;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.spark.SparkConf;  
  6. import org.apache.spark.deploy.yarn.Client;  
  7. import org.apache.spark.deploy.yarn.ClientArguments;  
  8. publicclass SubmitScalaJobToYarn {  
  9.     publicstaticvoid main(String[] args) {  
  10.         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");   
  11.         String filename = dateFormat.format(new Date());  
  12.         String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();  
  13.         tmp =tmp.substring(0, tmp.length()-8);  
  14.         String[] arg0=new String[]{  
  15.                 "--name","test java submit job to yarn",  
  16.                 "--class","Scala_Test",  
  17.                 "--executor-memory","1G",  
  18. //              "WebRoot/WEB-INF/lib/spark_filter.jar",//
  19.                 "--jar",tmp+"lib/spark_filter.jar",//
  20.                 "--arg","hdfs://node101:8020/user/root/log.txt",  
  21.                 "--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,  
  22.                 "--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//  
  23.                 "--archives","hdfs://node101:8020/user/root/servlet-api.jar"//  
  24.         };  
  25. //      SparkSubmit.main(arg0);
  26.         Configuration conf = new Configuration();  
  27.         String os = System.getProperty("os.name");  
  28.         boolean cross_platform =false;  
  29.         if(os.contains("Windows")){  
  30.             cross_platform = true;  
  31.         }  
  32.         conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務
  33.         conf.set("fs.defaultFS""hdfs://node101:8020");// 指定namenode  
  34.         conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
  35.         conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
  36.         conf.set("yarn.resourcemanager.scheduler.address""node101:8030");// 指定資源分配器
  37.         conf.set("mapreduce.jobhistory.address","node101:10020");  
  38.          System.setProperty("SPARK_YARN_MODE""true");  
  39.          SparkConf sparkConf = new SparkConf();  
  40.          ClientArguments cArgs = new ClientArguments(arg0, sparkConf);  
  41.         new Client(cArgs,conf,sparkConf).run();  
  42.     }  
  43. }  
3. java web測試 任務提交到Spark的兩種模式,這裡採用最簡單的方式,直接配置servlet,其web.xml檔案如下:
  1. <?xmlversion="1.0"encoding="UTF-8"?>
  2. 相關推薦

    Spark通過Java Web提交任務

    相關軟體版本: Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7 機器: windows7 (包含JDK1.8,MyEclipse2014,I

    Java Web提交任務Spark Spark通過Java Web提交任務

    相關軟體版本:Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7機器:windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14

    Java Web提交任務Spark

    相關軟體版本:Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7機器:windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14

    spark submit提交任務報錯Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/stream

    1.問題描述 提交spark任務: bin/spark-submit --master local[2] \ --class _0924MoocProject.ImoocStatStreamingApp_product \ /opt/datas/project/scala

    spark系列-應用篇之通過yarn api提交Spark任務

    前言 在工作中,大部使用的都是hadoop和spark的shell命令,或者通過java或者scala編寫程式碼。最近工作涉及到通過yarn api處理spark任務,感覺yarn的api還是挺全面的,但是呼叫時需要傳入很多引數,而且會出現一些詭異的問題。雖然

    hive on spark通過YARN-client提交任務不成功

    在Hive on spark中  設定spark.master 為 yarn-client , 提交HiveQL任務到yarn上執行,發現任務無法執行 輸出一下錯誤: 可以看到 Hive on S

    Spark通過Jar包提交任務

    Standalone需要通過bin/spark-submit來提交必需引數 --class --master舉例如下:/home/bigdata/hadoop/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \ --class org.a

    Spark中使用Java編程的常用方法

    廣播 新的 json lambda表達式 aslist rom collect spl nal 原文引自:http://blog.sina.com.cn/s/blog_628cc2b70102w9up.html 一、初始化SparkContext System.setPr

    通過java web代理將docker容器配置為jenkins節點

    1、安裝centos映象 docker search centos                  //查詢映象 docker pull centos      &nbs

    Caused by: java.io.NotSerializableException: org.apache.spark.unsafe.types.UTF8String$IntWrapper

    菜雞一隻!如果有什麼說錯的還請大家指出批評,堅決改正!! 遇到了一個挺詭異的報錯把,反正比較無語,發現國內網站完全搜不到這個報錯的解決方法,因此在這裡記錄下!! 1、環境: 這是一個spark的Task not serializable問題,因此只需要關注spark的版本就好了,我的版本是

    Spark 執行問題 java.lang.NoSuchMethodError: scala.Predef 解決方案

    idea中如果遇到這種問題,一般查詢和spark匹配的scala版本就能解決 如果不能解決 請開啟專案的iml檔案,去掉不同版本的scala的orderEntry就能解決。 另在mac中通常會有問題no snappyjava in java.library.path 解決方案如下 1.

    maven環境下使用java、scala混合開發spark應用

    熟悉java的開發者在開發spark應用時,常常會遇到spark對java的介面文件不完善或者不提供對應的java介面的問題。這個時候,如果在java專案中能直接使用scala來開發spark應用,同時使用java來處理專案中的其它需求,將在一定程度上降低開發spark專案的

    Spark,SparkSql wordCount,java wordcount

    +-----------------+---+ |             name|cnt| +-----------------+---+ |                 | 68| |              the| 22| |         

    最新美團點評Java團隊面試題:Spark+JDK ZGC+演算法+HashMap+Redis

    技術面(一、二、三面) Java 有什麼鎖型別? 有了解Spark嗎?Spark為什麼比Hadoop要快? 談談poll和epoll,epoll是同步還是非同步 JMM、老年代在什麼情況下會觸發GC、對老年代的GC會不會導致程式卡頓?(最優吞吐量和最短停頓時間)

    Spark機器學習(java):ALS交替最小二乘演算法

    楔子 Spark機器學習,推薦電影,採用ALS交替最小二乘演算法 Spark中ml和mllib的區別 Spark機器學習(10):ALS交替最小二乘演算法 demo import java.io.Serializable; import org.apach

    Scala和Java二種方式實戰Spark Streaming開發

    在這裡我主要借鑑課上老師講的以及官網的API來進行簡單的Spark Streaming的開發: 一:java形式: 1.我們可以總結一下步驟: 第一步:建立SparkConf物件 第二步:建立SparkStreamingContext 第三步:建立愛你

    spark報錯java.lang.OutOfMemoryError: Java heap space

    針對spark報錯: java.lang.OutOfMemoryError: Java heap space 解決方式:     在spark/conf/spark-env.sh中加大SPARK_WORKER_MEMORY值,如下,我加大至6GB export SPAR

    Spark SparkJava操作實現

    編譯出來的包位於target目錄下,pro-spark-example-2.2.0.jar 啟動spark叢集 [[email protected] ~]# cd /usr/local/spark-2.2.0-bin-hadoop2.6/sbin [[email protect

    Sparkjava.net.URISyntaxException: Relative path in absolute URI: file:D:/XXX/spark-spark-warehouse

    windows下使用intellij 開發 spark mllib 程式 發現如下錯誤。 var spark=SparkSession.builder().master("local").appName("test").getOrCreate() 執行時

    Spark升級到2.0後測試stream-kafka測試報java.lang.NoClassDefFoundError: org/apache/spark/Logging錯誤

    - 最近從Spark 1.5.2升級到2.0之後,執行測試程式碼spark-stream-kafka報以下錯誤: java.lang.NoClassDefFoundError: org/apache/spark/Logging at java.lang.ClassLo