Java Web提交任務到Spark
相關軟體版本:
Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7
機器:
windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);
centos6.6虛擬機器(Hadoop偽分散式叢集,Spark standAlone叢集,JDK1.8);
centos7虛擬機器(Tomcat,JDK1.8);
1. 場景:
1. windows簡單java程式呼叫Spark,執行Scala開發的Spark程式,這裡包含兩種模式:
1> 提交任務到Spark叢集,使用standAlone模式執行;
2> 提交任務到Yarn叢集,使用yarn-client的模式;
2. windows 開發java web程式呼叫Spark,執行Scala開發的Spark程式,同樣包含兩種模式,參考1.
3. linux執行java web程式呼叫Spark,執行Scala開發的Spark程式,包含兩種模式,參考1.
2. 實現:
1. 簡單Scala程式,該程式的功能是讀取HDFS中的log日誌檔案,過濾log檔案中的WARN和ERROR的記錄,最後把過濾後的記錄寫入到HDFS中,程式碼如下:
使用IntelliJ IDEA 並打成jar包備用(lz這裡命名為spark_filter.jar);import org.apache.spark.{SparkConf, SparkContext} /** * Created by Administrator on 2015/8/23. */ object Scala_Test { def main(args:Array[String]): Unit ={ if(args.length!=2){ System.err.println("Usage:Scala_Test <input> <output>") } // 初始化SparkConf val conf = new SparkConf().setAppName("Scala filter") val sc = new SparkContext(conf) // 讀入資料 val lines = sc.textFile(args(0)) // 轉換 val errorsRDD = lines.filter(line => line.contains("ERROR")) val warningsRDD = lines.filter(line => line.contains("WARN")) val badLinesRDD = errorsRDD.union(warningsRDD) // 寫入資料 badLinesRDD.saveAsTextFile(args(1)) // 關閉SparkConf sc.stop() } }
2. java呼叫spark_filter.jar中的Scala_Test 檔案,並採用Spark standAlone模式,java程式碼如下:
package test; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.spark.deploy.SparkSubmit; /** * @author fansy * */ public class SubmitScalaJobToSpark { public static void main(String[] args) { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss"); String filename = dateFormat.format(new Date()); String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath(); tmp =tmp.substring(0, tmp.length()-8); String[] arg0=new String[]{ "--master","spark://node101:7077", "--deploy-mode","client", "--name","test java submit job to spark", "--class","Scala_Test", "--executor-memory","1G", // "spark_filter.jar", tmp+"lib/spark_filter.jar",// "hdfs://node101:8020/user/root/log.txt", "hdfs://node101:8020/user/root/badLines_spark_"+filename }; SparkSubmit.main(arg0); } }
具體操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(該檔案在Spark壓縮檔案的lib目錄中,同時該檔案較大,拷貝需要一定時間) 拷貝到WebRoot/WEB-INF/lib目錄。(注意:這裡可以直接建立java web專案,在測試java呼叫時,直接執行java程式碼即可,在測試web專案時,開啟tomcat即可)
java呼叫spark_filter.jar中的Scala_Test 檔案,並採用Yarn模式。採用Yarn模式,不能使用簡單的修改master為“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的時候,使用這個,同時配置HADOOP_CONF_DIR路徑是可以的,但是在這裡,讀取不到HADOOP的配置,所以這裡採用其他方式,使用yarn.Clent提交的方式,java程式碼如下:
package test;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
public class SubmitScalaJobToYarn {
public static void main(String[] args) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
String filename = dateFormat.format(new Date());
String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
tmp =tmp.substring(0, tmp.length()-8);
String[] arg0=new String[]{
"--name","test java submit job to yarn",
"--class","Scala_Test",
"--executor-memory","1G",
// "WebRoot/WEB-INF/lib/spark_filter.jar",//
"--jar",tmp+"lib/spark_filter.jar",//
"--arg","hdfs://node101:8020/user/root/log.txt",
"--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
"--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
"--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
};
// SparkSubmit.main(arg0);
Configuration conf = new Configuration();
String os = System.getProperty("os.name");
boolean cross_platform =false;
if(os.contains("Windows")){
cross_platform = true;
}
conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務
conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定資源分配器
conf.set("mapreduce.jobhistory.address","node101:10020");
System.setProperty("SPARK_YARN_MODE", "true");
SparkConf sparkConf = new SparkConf();
ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
new Client(cArgs,conf,sparkConf).run();
}
}
3. java web測試 任務提交到Spark的兩種模式,這裡採用最簡單的方式,直接配置servlet,其web.xml檔案如下:<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0"
xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
<servlet>
<description>This is the description of my J2EE component</description>
<display-name>This is the display name of my J2EE component</display-name>
<servlet-name>SparkServlet</servlet-name>
<servlet-class>servlet.SparkServlet</servlet-class>
</servlet>
<servlet>
<description>This is the description of my J2EE component</description>
<display-name>This is the display name of my J2EE component</display-name>
<servlet-name>YarnServlet</servlet-name>
<servlet-class>servlet.YarnServlet</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>SparkServlet</servlet-name>
<url-pattern>/servlet/SparkServlet</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>YarnServlet</servlet-name>
<url-pattern>/servlet/YarnServlet</url-pattern>
</servlet-mapping>
</web-app>
SparkServlet如下:
package servlet;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import test.SubmitScalaJobToSpark;
public class SparkServlet extends HttpServlet {
/**
* Constructor of the object.
*/
public SparkServlet() {
super();
}
/**
* Destruction of the servlet. <br>
*/
public void destroy() {
super.destroy(); // Just puts "destroy" string in log
// Put your code here
}
/**
* The doGet method of the servlet. <br>
*
* This method is called when a form has its tag value method equals to get.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
this.doPost(request, response);
}
/**
* The doPost method of the servlet. <br>
*
* This method is called when a form has its tag value method equals to post.
*
* @param request the request send by the client to the server
* @param response the response send by the server to the client
* @throws ServletException if an error occurred
* @throws IOException if an error occurred
*/
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
System.out.println("開始SubmitScalaJobToSpark呼叫......");
SubmitScalaJobToSpark.main(null);
//YarnServlet也只是這裡不同
System.out.println("完成SubmitScalaJobToSpark呼叫!");
response.setContentType("text/html");
PrintWriter out = response.getWriter();
out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
out.println("<HTML>");
out.println(" <HEAD><TITLE>A Servlet</TITLE></HEAD>");
out.println(" <BODY>");
out.print(" This is ");
out.print(this.getClass());
out.println(", using the POST method");
out.println(" </BODY>");
out.println("</HTML>");
out.flush();
out.close();
}
/**
* Initialization of the servlet. <br>
*
* @throws ServletException if an error occurs
*/
public void init() throws ServletException {
// Put your code here
}
}
這裡只是呼叫了java編寫的任務呼叫類而已。同時,SparServlet和YarnServlet也只是在呼叫的地方不同而已。
在web測試時,首先直接在MyEclipse上測試,然後拷貝工程WebRoot到centos7,再次執行tomcat,進行測試。
3. 總結及問題
1. 測試結果:1> java程式碼直接提交任務到Spark和Yarn,進行日誌檔案的過濾,測試是成功執行的。可以在Yarn和Spark的監控中看到相關資訊:
同時,在HDFS可以看到輸出的檔案:
2> java web 提交任務到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet資料夾刪掉,因為會和tomcat的servlet-api.jar衝突。
a. 在windows和linux上啟動tomcat,提交任務到Spark standAlone,測試成功執行;
b. 在windows和linux上啟動tomcat,提交任務到Yarn,測試失敗;
2. 遇到的問題:
1> java web 提交任務到Yarn,會失敗,失敗的主要日誌如下:
15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
這個是因為javax.servlet的包被刪掉了,和tomcat的衝突。同時,在日誌中還可以看到:
15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
這裡在環境初始化的時候,上傳了兩個jar,一個就是spark-assembly-1.4.1-hadoop2.6.0.jar 還有一個就是我們自定義的jar。上傳的spark-assembly-1.4.1-hadoop2.6.0.jar 裡面沒有javax.servlet的資料夾,所以會報錯。在java中直接呼叫(沒有刪除javax.servlet的時候)同樣會看到這樣的日誌,同樣的上傳,那時是可以的,也就是說這裡確實是刪除了包資料夾的關係。那麼如何修復呢?
上傳servlet-api到hdfs,同時在使用yarn.Client提交任務的時候,新增相關的引數,這裡檢視引數,發現兩個比較相關的引數,--addJars以及--archive 引數,把這兩個引數都新增後,看到日誌中確實把這個jar包作為了job的共享檔案,但是java web提交任務到yarn 還是報這個類找不到的錯誤。所以這個辦法也是行不通!(可以參考http://blog.csdn.net/fansy1990/article/details/52289826中的部署部分解決這個問題)
分享,成長,快樂
腳踏實地,專注