使用scala的actor模型實現併發的例子
阿新 • • 發佈:2018-12-25
</pre><pre code_snippet_id="1793449" snippet_file_name="blog_20160729_3_9097610" name="code" class="java">/** * Created by lengmengwuxie on 2016/7/29. */ import scala.actors._ import scala.actors.Actor._ //設計方式: // 1,任何模擬物件在所有其他模擬物件完成對時間n的處理之前,都不應該處理時間n+1的事件 // 2,假設要實現所有模擬物件同步執行,使用一個“時鐘”actor來跟蹤當前的時間 //在選定的時間去傳送Ping訊息到所有actor 檢查是否當前時間點的事件都已經被完成 case class Ping(time: Int) //模擬物件確認已經處理完畢 返回Pong事件 case class Pong(time: Int, from: Actor) //3,為了模擬物件能夠切確知道自己已經完成了當前時間點的工作,而不是還需要等待與其他actor之間的 //訊息反饋,需要增加兩個限制: // a,模擬物件從不直接相互發送訊息,而是相互安排事件日程; // b,從不向當前時間點提交時間,而是向未來至少比當前多一點的時間提交; //因此我們需要一個工作項的日程表,這個日程表也可以在“時鐘”actor上,時鐘actor先為所有模擬物件 //傳送當前時間點所有的工作項的請求之後,才傳送Ping訊息。 //工作項訊息 case class WorkItem(time: Int, msg: Any, target: Actor) //安排新工作的訊息 case class AfterDelay(delay: Int, msg: Any, target: Actor) //用於要求模型啟動和停止的訊息 case object Start case object Stop //時鐘actor class Clock extends Actor{ private var running = false private var currentTime = 0 private var agenda: List[WorkItem] = List() private var allSimulants: List[Actor] = List() private var busySimulants: Set[Actor] = Set.empty def add(sim: Simulant): Unit ={ allSimulants = sim :: allSimulants } def act(): Unit ={ loop{ if (running && busySimulants.isEmpty) advance() reactToOneMessage() } } //時間前進 def advance(): Unit ={ //日程表為空 且模擬已經開始了則模擬需要退出 if (agenda.isEmpty && currentTime > 0){ println("** Agenda empty. Clock exiting at time " + currentTime + ".") self ! Stop return } currentTime += 1 println("Advacting to time" + currentTime) processCurrentEvents() for (sim <- allSimulants) //向所有工作中的模擬物件傳送Ping sim ! Ping(currentTime) busySimulants = Set.empty ++ allSimulants } //處理所有在日程表裡時間為currentTime的事件 private def processCurrentEvents():Unit = { val todoNow = agenda.takeWhile(_.time <= currentTime) //獲取agenda中所有時間等於currentTime的事件 agenda = agenda.drop(todoNow.length) //從agenda中去掉與todoNow包含的條目 for(WorkItem(time, msg, target) <- todoNow){ assert(time == currentTime) target ! msg } } //react事件處理 def reactToOneMessage(): Unit ={ react{ case AfterDelay(delay, msg, target) => //將新的條目新增到工作佇列 val item = WorkItem(currentTime + delay, msg, target) agenda = insert(agenda, item) case Pong(time, sim) => //從忙碌的模擬物件中去除一個模擬物件 assert(time == currentTime) assert(busySimulants contains sim) busySimulants -= sim case Start => running = true //讓模擬開始 case Stop => //讓時鐘停止 for (sim <- allSimulants) sim ! Stop exit() } } def insert(ag: List[WorkItem], item: WorkItem): List[WorkItem] ={ if (ag.isEmpty || item.time > ag.head.time) item :: ag else ag.head :: insert(ag.tail, item) } } //不同的被模擬物件之間有的共同行為,將其定義為特質 //simulant是能夠接受模擬訊息Stop和Ping並於他們合作的任何actor trait Simulant extends Actor { val clock: Clock def handleSimMessage(msg: Any) def simStarting() {} def act(): Unit = { loop { react { case Stop => exit() case Ping(time) => if (time == 1) simStarting() clock ! Pong(time, self) case msg => handleSimMessage(msg) } } } //模擬物件在建立時便啟動執行,安全且方便,在接受到時鐘訊息之前不會做任何事 start() }