1. 程式人生 > >使用scala的actor模型實現併發的例子

使用scala的actor模型實現併發的例子

</pre><pre code_snippet_id="1793449" snippet_file_name="blog_20160729_3_9097610" name="code" class="java">/**
* Created by lengmengwuxie on 2016/7/29.
*/
import scala.actors._
import scala.actors.Actor._


//設計方式:
// 1,任何模擬物件在所有其他模擬物件完成對時間n的處理之前,都不應該處理時間n+1的事件
// 2,假設要實現所有模擬物件同步執行,使用一個“時鐘”actor來跟蹤當前的時間

//在選定的時間去傳送Ping訊息到所有actor 檢查是否當前時間點的事件都已經被完成
case class Ping(time: Int)
//模擬物件確認已經處理完畢 返回Pong事件
case class Pong(time: Int, from: Actor)

//3,為了模擬物件能夠切確知道自己已經完成了當前時間點的工作,而不是還需要等待與其他actor之間的
//訊息反饋,需要增加兩個限制:
//    a,模擬物件從不直接相互發送訊息,而是相互安排事件日程;
//    b,從不向當前時間點提交時間,而是向未來至少比當前多一點的時間提交;
//因此我們需要一個工作項的日程表,這個日程表也可以在“時鐘”actor上,時鐘actor先為所有模擬物件
//傳送當前時間點所有的工作項的請求之後,才傳送Ping訊息。

//工作項訊息
case class WorkItem(time: Int, msg: Any, target: Actor)
//安排新工作的訊息
case class AfterDelay(delay: Int, msg: Any, target: Actor)
//用於要求模型啟動和停止的訊息
case object Start
case object Stop

//時鐘actor
class Clock extends Actor{
  private var running = false
  private var currentTime = 0
  private var agenda: List[WorkItem] = List()
  private var allSimulants: List[Actor] = List()
  private var busySimulants: Set[Actor] = Set.empty

  def add(sim: Simulant): Unit ={
    allSimulants = sim :: allSimulants
  }

  def act(): Unit ={
    loop{
      if (running && busySimulants.isEmpty)
        advance()
      reactToOneMessage()
    }
  }

  //時間前進
  def advance(): Unit ={
    //日程表為空 且模擬已經開始了則模擬需要退出
    if (agenda.isEmpty && currentTime > 0){
      println("** Agenda empty. Clock exiting at time " + currentTime + ".")
      self ! Stop
      return
    }
    currentTime += 1
    println("Advacting to time" + currentTime)

    processCurrentEvents()
    for (sim <- allSimulants) //向所有工作中的模擬物件傳送Ping
      sim ! Ping(currentTime)

    busySimulants = Set.empty ++ allSimulants
  }

  //處理所有在日程表裡時間為currentTime的事件
  private def processCurrentEvents():Unit = {
    val todoNow = agenda.takeWhile(_.time <= currentTime) //獲取agenda中所有時間等於currentTime的事件
    agenda = agenda.drop(todoNow.length) //從agenda中去掉與todoNow包含的條目
    for(WorkItem(time, msg, target) <- todoNow){
      assert(time == currentTime)
      target ! msg
    }
  }

  //react事件處理
  def reactToOneMessage(): Unit ={
    react{
      case AfterDelay(delay, msg, target) =>      //將新的條目新增到工作佇列
        val item = WorkItem(currentTime + delay, msg, target)
        agenda = insert(agenda, item)
      case Pong(time, sim) =>                     //從忙碌的模擬物件中去除一個模擬物件
        assert(time == currentTime)
        assert(busySimulants contains sim)
        busySimulants -= sim
      case Start => running = true                //讓模擬開始
      case Stop =>                                //讓時鐘停止
        for (sim <- allSimulants)
          sim ! Stop
        exit()
    }
  }

  def insert(ag: List[WorkItem], item: WorkItem): List[WorkItem] ={
    if (ag.isEmpty || item.time > ag.head.time) item :: ag
    else ag.head :: insert(ag.tail, item)
  }
}

//不同的被模擬物件之間有的共同行為,將其定義為特質
//simulant是能夠接受模擬訊息Stop和Ping並於他們合作的任何actor
trait Simulant extends Actor {
  val clock: Clock
  def handleSimMessage(msg: Any)
  def simStarting() {}

  def act(): Unit = {
    loop {
      react {
        case Stop => exit()
        case Ping(time) =>
          if (time == 1) simStarting()
          clock ! Pong(time, self)
        case msg => handleSimMessage(msg)
      }
    }
  }
  //模擬物件在建立時便啟動執行,安全且方便,在接受到時鐘訊息之前不會做任何事
  start()
}