Spark如何使用Akka實現程序、節點通訊的簡明介紹
《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》
《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》
Akka簡介
Scala認為Java執行緒通過共享資料以及通過鎖來維護共享資料的一致性是糟糕的做法,容易引起鎖的爭用,而且執行緒的上下文切換會帶來不少開銷,降低併發程式的效能,甚至會引入死鎖的問題。在Scala中只需要自定義型別繼承Actor,並且提供act方法,就如同Java裡實現Runnable介面,需要實現run方法一樣。但是不能直接呼叫act方法,而是通過傳送訊息的方式(Scala傳送訊息是非同步的),傳遞資料。如:
Actor ! message
Akka是Actor程式設計模型的高階類庫,類似於JDK 1.5之後越來越豐富的併發工具包,簡化了程式設計師併發程式設計的難度。Akka是一款提供了用於構建高併發的、分散式的、可伸縮的、基於Java虛擬機器的訊息驅動應用的工具集和執行時環境。從下面Akka官網提供的一段程式碼示例,可以看出Akka併發程式設計的簡約。
case class Greeting(who: String) class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ⇒ log.info("Hello " + who) } } val system = ActorSystem("MySystem") val greeter = system.actorOf(Props[GreetingActor], name = "greeter") greeter ! Greeting("Charlie Parker")
Akka提供了分散式的框架,意味著使用者不需要考慮如何實現分散式部署,Akka官網提供了下面的示例演示如何獲取遠端Actor的引用。
// config on all machines akka { actor { provider = akka.remote.RemoteActorRefProvider deployment { /greeter { remote = akka.tcp://[email protected]:2552 } } } } // ------------------------------// define the greeting actor and the greeting message case class Greeting(who: String) extends Serializable class GreetingActor extends Actor with ActorLogging { def receive = { case Greeting(who) ⇒ log.info("Hello " + who) } } // ------------------------------ // on machine 1: empty system, target for deployment from machine 2 val system = ActorSystem("MySystem") // ------------------------------ // on machine 2: Remote Deployment - deploying on machine1 val system = ActorSystem("MySystem") val greeter = system.actorOf(Props[GreetingActor], name = "greeter") // ------------------------------ // on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent) val system = ActorSystem("MySystem") val greeter = system.actorSelection("akka.tcp://[email protected]:2552/user/greeter") greeter ! Greeting("Sonny Rollins")
Actor之間最終會構成一棵樹,作為父親的Actor應當對所有兒子的異常失敗進行處理(監管)Akka給出了簡單的示例,程式碼如下。
class Supervisor extends Actor { override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: ArithmeticException ⇒ Resume case _: NullPointerException ⇒ Restart case _: Exception ⇒ Escalate } val worker = context.actorOf(Props[Worker]) def receive = { case n: Int => worker forward n } }
Akka的更多資訊請訪問官方網站:http://akka.io/
基於Akka的分散式訊息系統ActorSystem
Spark使用Akka提供的訊息系統實現併發:ActorSystem是Spark中最基礎的設施,Spark既使用它傳送分散式訊息,又用它實現併發程式設計。正是因為Actor輕量級的併發程式設計、訊息傳送以及ActorSystem支援分散式訊息傳送等特點,Spark選擇了ActorSystem。
SparkEnv中建立ActorSystem時用到了AkkaUtils工具類,程式碼如下。
val (actorSystem, boundPort) = Option(defaultActorSystem) match { case Some(as) => (as, port) case None => val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) }
AkkaUtils.createActorSystem方法用於啟動ActorSystem,程式碼如下。
def createActorSystem( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val startService: Int => (ActorSystem, Int) = { actualPort => doCreateActorSystem(name, host, actualPort, conf, securityManager) } Utils.startServiceOnPort(port, startService, conf, name) }
AkkaUtils使用了Utils的靜態方法startServiceOnPort, startServiceOnPort最終會回撥方法startService: Int=> (T, Int),此處的startService實際是方法doCreateActorSystem。真正啟動ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具體實現細節請見AkkaUtils的詳細介紹。關於startServiceOnPort的實現,請參閱[《Spark中常用工具類Utils的簡明介紹》](http://blog.csdn.net/beliefer/article/details/50904662)一文的內容。
AkkaUtils
AkkaUtils是Spark對Akka相關API的又一層封裝,這裡對其常用的功能進行介紹。
(1)doCreateActorSystem
功能描述:建立ActorSystem。
private def doCreateActorSystem( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) val akkaTimeout = conf.getInt("spark.akka.timeout", 100) val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) } val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000) val akkaFailureDetector = conf.getDouble("spark.akka.failure-detector.threshold", 300.0) val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) val secretKey = securityManager.getSecretKey() val isAuthOn = securityManager.isAuthenticationEnabled() if (isAuthOn && secretKey == null) { throw new Exception("Secret key is null with authentication on") } val requireCookie = if (isAuthOn) "on" else "off" val secureCookie = if (isAuthOn) secretKey else "" logDebug("In createActorSystem, requireCookie is: " + requireCookie) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.stdout-loglevel = "ERROR" |akka.jvm-exit-on-fatal-error = off |akka.remote.require-cookie = "$requireCookie" |akka.remote.secure-cookie = "$secureCookie" |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents |akka.log-dead-letters = $lifecycleEvents |akka.log-dead-letters-during-shutdown = $lifecycleEvents """.stripMargin)) val actorSystem = ActorSystem(name, akkaConf) val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.getDefaultAddress.port.get (actorSystem, boundPort) }
(2)makeDriverRef
功能描述:從遠端ActorSystem中查詢已經註冊的某個Actor。
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = { val driverActorSystemName = SparkEnv.driverActorSystemName val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/$name" val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) }