Scala學習之路 (十)Scala的Actor
一、Scala中的並發編程
1、Java中的並發編程
①Java中的並發編程基本上滿足了事件之間相互獨立,但是事件能夠同時發生的場景的需要。
②Java中的並發編程是基於共享數據和加鎖的一種機制,即會有一個共享的數據,然後有若幹個線程去訪問這個共享的數據(主要是對這個共享的數據進行修改),同時Java利用加鎖的機制(即synchronized)來確保同一時間只有一個線程對我們的共享數據進行訪問,進而保證共享數據的一致性。
③Java中的並發編程存在資源爭奪和死鎖等多種問題,因此程序越大問題越麻煩。
2、Scala中的並發編程
①Scala中的並發編程思想與Java中的並發編程思想完全不一樣,Scala中的Actor是一種不共享數據,依賴於消息傳遞的一種並發編程模式, 避免了死鎖、資源爭奪等情況。在具體實現的過程中,Scala中的Actor會不斷的循環自己的郵箱,並通過receive偏函數進行消息的模式匹配並進行相應的處理。
②如果Actor A和 Actor B要相互溝通的話,首先A要給B傳遞一個消息,B會有一個收件箱,然後B會不斷的循環自己的收件箱, 若看見A發過來的消息,B就會解析A的消息並執行,處理完之後就有可能將處理的結果通過郵件的方式發送給A。
二、Scala中的Actor
1、什麽是Actor
一個actor是一個容器,它包含 狀態, 行為,信箱,子Actor 和 監管策略,所有這些包含在一個ActorReference(Actor引用)裏。一個actor需要與外界隔離才能從actor模型中獲益,所以actor是以actor引用的形式展現給外界的。
2、ActorSystem的層次結構
如果一個Actor中的業務邏輯非常復雜,為了降低代碼的復雜度,可以將其拆分成多個子任務(在一個actor的內部可以創建一個或多個actor,actor的創建者也是該actor的監控者)
一個ActorSystem應該被正確規劃,例如哪一個Actor負責監控,監控什麽等等:
- 負責分發的actor管理接受任務的actor
-
擁有重要數據的actor,找出所有可能丟失數據的子actor,並且處理他們的錯誤。
3、ActorPath
ActorPath是通過字符串描述Actor的層級關系,並唯一標識一個Actor的方法。
ActorPath包含協議,位置 和 Actor層級關系
//本地path "akka://my-sys/user/service-a/worker1" //遠程path akka.tcp://(ActorSystem的名稱)@(遠程地址的IP):(遠程地址的端口)/user/(Actor的名稱) "akka.tcp://[email protected]:5678/user/service-b" //akka集群 "cluster://my-cluster/service-c"
遠程地址不清楚是多少的話,可以在遠程的服務啟動的時候查看
4、獲取Actor Reference
獲取Actor引用的方式有兩種:創建 和 查找。
要創建Actor,可以調用ActorSystem.actorOf(..),它創建的actor在guardian actor之下,接著可以調用ActorContext的actorOf(…) 在剛才創建的Actor內生成一個actor樹。這些方法會返回新創建的actor的引用,每一個actor都可以通過訪問ActorContext來獲得自己(self),子Actor(children,child)和父actor(parent)。
要查找Actor Reference,可以調用ActorSystem或ActorContext的actorSelection(“path”),在查找ActorRef時,可以使用相對路徑或絕對路徑,如果是相對路徑,可以用 .. 來表示parent actor。
actorOf / actorSelection / actorFor的區別
actorOf 創建一個新的actor,創建的actor為調用該方法所屬的context的直接子actor。
actorSelection 查找現有actor,並不會創建新的actor。
actorFor 查找現有actor,不創建新的actor,已過時。
5、Actor和ActorSystem
Actor:
就是用來做消息傳遞的
用來接收和發送消息的,一個actor就相當於是一個老師或者是學生。
如果我們想要多個老師,或者學生,就需要創建多個actor實例。
ActorSystem:
用來創建和管理actor,並且還需要監控Actor。ActorSystem是單例的(object)
在同一個進程裏面,只需要一個ActorSystem就可以了
三、Actor的示例
1、示例說明
2、代碼實現
MyResourceManager.scala(服務端)
package com.rpc import akka.actor._ import com.typesafe.config.{Config, ConfigFactory} import scala.collection.mutable class MyResourceManager(var resourceManagerHostName:String, var resourceManagerPort:Int) extends Actor { /** * 定義一個Map,接受MyNodeManager的註冊信息,key是主機名, * value是NodeManagerInfo對象,裏面存儲主機名、CPU和內存信息 * */ var registerMap = new mutable.HashMap[String,NodeManagerInfo]() /** * 定義一個Set,接受MyNodeManager的註冊信息,key是主機名, * value是NodeManagerInfo對象,裏面存儲主機名、CPU和內存信息 * 實際上和上面的Map裏面存檔內容一樣,容易變歷,可以不用寫,主要是模仿後面Spark裏面的內容 * 方便到時理解Spark源碼 * */ var registerSet = new mutable.HashSet[NodeManagerInfo]() override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher context.system.scheduler.schedule(0 millis, 5000 millis, self,CheckTimeOut) } //對MyNodeManager傳過來的信息進行匹配 override def receive: Receive = { //匹配到NodeManager的註冊信息進行對應處理 case NodeManagerRegisterMsg(nodeManagerID,cpu,memory) => { //將註冊信息實例化為一個NodeManagerInfo對象 val registerMsg = new NodeManagerInfo(nodeManagerID,cpu,memory) //將註冊信息存儲到registerMap和registerSet裏面,key是主機名,value是NodeManagerInfo對象 registerMap.put(nodeManagerID,registerMsg) registerSet += registerMsg //註冊成功之後,反饋個MyNodeManager一個成功的信息 sender() ! new RegisterFeedbackMsg("註冊成功!" + resourceManagerHostName+":"+resourceManagerPort) } //匹配到心跳信息做相應處理 case HeartBeat(nodeManagerID) => { //獲取當前時間 val time:Long = System.currentTimeMillis() //根據nodeManagerID獲取NodeManagerInfo對象 val info = registerMap(nodeManagerID) info.lastHeartBeatTime = time //更新registerMap和registerSet裏面nodeManagerID對應的NodeManagerInfo對象信息(最後一次心跳時間) registerMap(nodeManagerID) = info registerSet += info } //檢測超時,對超時的數據從集合中刪除 case CheckTimeOut => { var time = System.currentTimeMillis() registerSet .filter( nm => time - nm.lastHeartBeatTime > 10000) .foreach(deadnm => { registerSet -= deadnm registerMap.remove(deadnm.nodeManagerID) }) println("當前註冊成功的節點數:" + registerMap.size) } } } object MyResourceManager { def main(args: Array[String]): Unit = { /** * 傳參: * ResourceManager的主機地址、端口號 * */ val RM_HOSTNAME = args(0) val RM_PORT = args(1).toInt val str:String = """ |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname =localhost |akka.remote.netty.tcp.port=19888 """.stripMargin val conf: Config = ConfigFactory.parseString(str) val actorSystem = ActorSystem(Conf.RMAS,conf) actorSystem.actorOf(Props(new MyResourceManager(RM_HOSTNAME,RM_PORT)),Conf.RMA) } }View Code
MyNodeManager.scala(客戶端)
package com.rpc import java.util.UUID import akka.actor._ import com.typesafe.config.{Config, ConfigFactory} class MyNodeManager(resourceManagerHostName:String,resourceManagerPort:Int,cpu:Int,memory:Int) extends Actor{ //MyNodeManager的UUID var nodeManagerID:String = _ var rmref:ActorSelection = _ override def preStart(): Unit = { //獲取MyResourceManager的Actor的引用 rmref = context.actorSelection(s"akka.tcp://${Conf.RMAS}@${resourceManagerHostName}:${resourceManagerPort}/user/${Conf.RMA}") //生成隨機的UUID nodeManagerID = UUID.randomUUID().toString /** * 向MyResourceManager發送註冊信息 * */ rmref ! NodeManagerRegisterMsg(nodeManagerID,cpu,memory) } //進行信息匹配 override def receive: Receive = { //匹配到註冊成功之後MyResourceManager反饋回的信息,進行相應處理 case RegisterFeedbackMsg(feedbackMsg) => { /** * initialDelay: FiniteDuration, 多久以後開始執行 * interval: FiniteDuration, 每隔多長時間執行一次 * receiver: ActorRef, 給誰發送這個消息 * message: Any 發送的消息是啥 */ //定時任務需要導入的工具包 import scala.concurrent.duration._ import context.dispatcher //定時向自己發送信息 context.system.scheduler.schedule(0 millis, 3000 millis, self, SendMessage) } //匹配到SendMessage信息之後做相應處理 case SendMessage => { //向MyResourceManager發送心跳信息 rmref ! HeartBeat(nodeManagerID) println(Thread.currentThread().getId + ":" + System.currentTimeMillis()) } } } object MyNodeManager { def main(args: Array[String]): Unit = { /** * 傳參: * NodeManager的主機地址、端口號、CPU、內存 * ResourceManager的主機地址、端口號 * */ val NM_HOSTNAME = args(0) val NM_PORT = args(1) val NM_CPU:Int = args(2).toInt val NM_MEMORY:Int = args(3).toInt val RM_HOSTNAME = args(4) val RM_PORT = args(5).toInt val str:String = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = ${NM_HOSTNAME} |akka.remote.netty.tcp.port = ${NM_PORT} """.stripMargin val conf: Config = ConfigFactory.parseString(str) val actorSystem = ActorSystem(Conf.NMAS,conf) actorSystem.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NM_CPU,NM_MEMORY)),Conf.NMA) } }View Code
Conf.scala(配置文件)
package com.rpc //避免硬編碼 object Conf { //ResourceManagerActorSystem val RMAS = "MyRMActorSystem" //ResourceManagerActor val RMA = "MyRMActor" //NodeManagerActorSystem val NMAS = "MyNMActorSystem" //NodeManagerActor val NMA = "MyNMactor" }View Code
Message.scala
package com.rpc //NodeManager註冊信息 case class NodeManagerRegisterMsg(val nodeManagerID:String, var cpu:Int, var memory:Int) //ResourceManager接收到註冊信息成功之後的返回信息 case class RegisterFeedbackMsg(val feedbackMsg: String) //NodeManager的心跳信息 case class HeartBeat(val nodeManagerID:String) //NodeManager註冊信息 class NodeManagerInfo(val nodeManagerID:String, var cpu:Int, var memory:Int){ //定義一個屬性,存儲上一次的心跳時間 var lastHeartBeatTime:Long = _ } case object SendMessage case object CheckTimeOutView Code
3、運行
(1)運行MyResourceManager
運行結果
發現報錯數組越界,原因是在啟動時需要傳入2個參數
重新啟動,啟動成功
2、運行MyNodeManager
報相同的錯誤,不過此處需要傳入6個參數
重新啟動,啟動成功
3、觀察MyResourceManager
發現有一個節點連接成功
4、再啟動一個MyNodeManager觀察情況
先修改MyNodeManager配置裏面的端口
再啟動
啟動成功之後觀察MyResourceManager,此時有2個節點連接成功
5、關閉一個節點,觀察情況
集合中連接超時的成功刪除
Scala學習之路 (十)Scala的Actor