記spark過程中Scala多執行緒小問題
阿新 • • 發佈:2018-12-22
這次更改ThriftServer原始碼,加了些業務,中間遇到這樣一個問題,非同步提交任務的時候想做成多執行緒,剛開始是使用的scala的Actor,傳遞了SQLContext和sql,發現每次sparkSessionId在一直變化,每次提交和觸發Action之後產生的sessionId都不一致,這是怎麼回事,後來才發現是多執行緒非同步的問題,傳遞sqlContext線上程那邊執行任務的時候會重新觸發一個會話,那可怎麼辦呢,只能用以下方式實現了
java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.submit(new Callable<Void>() { @Override public Void call(){ df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class); return null; } }); executorService.shutdown();
在當前方法內部使用context變數就可以了
當然為了方便大家學習,另外常見的寫法如下:
import java.util.concurrent.{Executors, ExecutorService} object Test { def main(args: Array[String]) { //建立執行緒池 val threadPool:ExecutorService=Executors.newFixedThreadPool(5) try { //提交5個執行緒 for(i <- 1 to 5){ //threadPool.submit(new ThreadDemo("thread"+i)) threadPool.execute(new ThreadDemo("thread"+i)) } }finally { threadPool.shutdown() } } //定義執行緒類,每列印一次睡眠100毫秒 class ThreadDemo(threadName:String) extends Runnable{ override def run(){ for(i <- 1 to 10){ println(threadName+"|"+i) Thread.sleep(100) } } } }
Callable示例
import java.util.concurrent.{Callable, FutureTask, Executors, ExecutorService} object Test { def main(args: Array[String]) { val threadPool:ExecutorService=Executors.newFixedThreadPool(3) try { val future=new FutureTask[String](new Callable[String] { override def call(): String = { Thread.sleep(100) return "im result" } }) threadPool.execute(future) println(future.get()) }finally { threadPool.shutdown() } } }