遠端呼叫Spark平臺中的程式
阿新 • • 發佈:2019-02-01
用scala語言,開發好了在spark平臺上可以一直執行的機器學習模型
現在有個需求:
要遠端呼叫該模型的一些方法並獲取結果
那麼可以使用jetty在伺服器端主節點佔用一個埠然後對外提供http服務
package com.xxx.rec.basic.ccam.jetty
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.xxx.rec.basic.ccam.CanonicalCorrelationAnalysisModel
import org.mortbay.jetty.{HttpStatus, Request, Server}
import org.mortbay .jetty.handler._
object CannonicalCorrelationAnalysisModelJerseyServer
extends AbstractHandler{
var model: CanonicalCorrelationAnalysisModel = null
/**
* 處理請求 返回響應
* @param target
* @param request
* @param response
* @param dispatch
*/
override def handle(target: String,
request: HttpServletRequest,
response: HttpServletResponse,
dispatch: Int): Unit = {
val url=request.getRequestURI
url.substring(url.lastIndexOf("/")+1,url.length) match {
case "recommend" => {
//request中的target 用,號分割
val target: Seq[String] = request.getParameter("target").split(",").toSeq
val topNum: Int = request.getParameter("topNum").toInt
val result = model.recommend (target, topNum)
response.setStatus(HttpStatus.ORDINAL_200_OK);
response.getWriter().println(result.mkString(","))
request.asInstanceOf[Request].setHandled(true)
response.getWriter.close()
}
case _ => {
response.setStatus(HttpStatus.ORDINAL_404_Not_Found);
request.asInstanceOf[Request].setHandled(true)
}
}
}
def main(args: Array[String]): Unit = {
import org.apache.spark.{SparkConf, SparkContext}
val sparkConf = new SparkConf().setAppName("CanonicalCorrelationAnalysisModelDemo")
val textFilePath = "file:///home/xxx/xxx.txt"
val sc = new SparkContext(sparkConf)
val data = sc.textFile(textFilePath).map { line =>
line.split(' ')
}.cache()
model = CanonicalCorrelationAnalysisModel.createModel(data, 0.3, 5)
val server=new Server(9998)
server.setHandler(this)
server.start()
}
}
該程式執行後佔用了伺服器端主節點的9998埠,通過http訪問即可