1. 程式人生 > >Scala程式設計實戰 RPC

Scala程式設計實戰 RPC

1.  熟練使用Scala編寫程式

2. 專案概述 

2.1 需求 

目前大多數的分散式架構層通訊都是通過RPC實現的,RPC架構非常多,比如Hadoop專案的RPC通訊框架,但是Hadoo在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重  Spark的RPC是通過AKKa類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。   

2.2 Akka簡介 Akka基於Actor模型,提供了一個用於構建可擴充套件(Scalable),彈性的(Resilient),快速響應的(Responsive)應用程式的平臺。  Actor模型:在電腦科學領域,Actor模型是一個平行計算(ConCurrent Computation)模型,它把actor做出一些決策,如建立更多的actor,或傳送更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。

Actor是一個Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每一個Actor都有自己的收件箱(MailBox)。通過Actor能夠簡化鎖及執行緒還禮,可以非常的開發出正確併發程式和並行系統,Actor具有如下特性:

1.提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(parallelism)應用場景下的程式設計開發

 2. 提供了非同步非阻塞的,高效能的時間驅動程式設計模型

3.超級了輕量級事件處理(每GB堆記憶體幾百萬Actor)   

3. 專案實現

3.1 架構圖

3.2  重要類介紹   

3.2.1 ActorSystem

在Akka中,ActorSystem是一個重要級的結構,需要分配多個執行緒,所以在實際應用中,ActorSystem通常是一個單例物件,可以使用這個ActorSystem建立很多Actor.

3.2.2 Actor 

在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法 

1.preStart()方法:該方法Actor物件構造方法執行後執行,整個Actor生命週期中僅執行一次。

2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收訊息,會被返回執行。

 

1.WorkInfo類

package com.zhiyou100.ScalaActor_akka.Worker
/*用來封裝master接受的worker的資訊
   @param id
   @param mem
   @param  cores
 */
class WorkInfo(val id:String,val mem:Int,val cores:Int) {
    var LastBeat :Long = _
}

2.IntervalMeaage類

package com.zhiyou100.ScalaActor_akka.Worker
/*互動訊息:用來封裝Master與Worker進行互動的訊息
  1.註冊  2.Master的反饋 3.心跳 4.檢查資訊(時間超時)
  這裡面都是樣例類,因為在網路傳輸所以必須實現序列化
 */
trait IntervalMeaage extends Serializable     //抽象類
  //註冊樣例類,封裝了從Worker到Master的註冊訊息

  case class  RegisterWorker(id:String,mem:Int,cores:Int) extends IntervalMeaage  //mem  記憶體   core核數
 //註冊成功確認訊息
  case class RegisterFinish(masterURL:String) extends  IntervalMeaage
  //傳送心跳   worker -->worker
  case object SendHeartBeat
//傳送心跳 worker -> master
   case class  HeartBeat (id:String) extends IntervalMeaage
//超時檢查  master -->master
case object CheckTimeOut

 

3.Master類

package com.zhiyou100.ScalaActor_akka.Worker

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import  scala.concurrent.duration._

class Master extends Actor{
  //用來儲存worker傳輸過來的資訊,value最好一個實體類,以後方便使用其中屬性
  val ids=new mutable.HashMap[String,WorkInfo]()
  //儲存map中的workInfo的集合,方便以後屬性的排序
  val workers=new mutable.HashSet[WorkInfo]()
  val CHECK_BEAT=2500
  // 生命週期之啟動Actor     contrl+o
  override def preStart():Unit ={
  println("Actor is preStart")
    //啟動定時器,檢查worker心跳是否正常
    import context.dispatcher

    context.system.scheduler.schedule(0.millis,CHECK_BEAT.millis,self,CheckTimeOut)
  }

  //用於接收訊息
  override def receive:Receive = {
    case "connect" =>{
      println("success")
      sender ! "reply"
    }
    case "hello" =>{
      println("1111" )
    }
    case RegisterWorker(id,mem,cores) =>{
      //判斷一下worker是否註冊過
      if (!ids.contains(id)){
        //將封裝例項化
       val workInfo = new WorkInfo(id,mem,cores)
        //儲存資料策略: 1.儲存記憶體 2.持久化到磁碟 3. 儲存到zookeeper
      //map的put的操作
        ids (id) =workInfo
        //set的追加操作
        workers += workInfo
        //建立樣例類傳送註冊成功確認訊息
        sender ! RegisterFinish("akka.tcp://[email protected]$host:$port/user/Master")
      }
    }
    case HeartBeat(id) =>{
      if (ids.contains(id)){
        val workInfo =ids(id)
        val currentTime=System.currentTimeMillis()
        //把心跳時間用當前的時間置換
        workInfo.LastBeat =currentTime
      }
    }
    case CheckTimeOut =>{
   val concurrentTime=System.currentTimeMillis()
    val toClean =  workers.filter(x=> concurrentTime -x.LastBeat >CHECK_BEAT)
      for (w<- toClean){
         workers -=w
        ids -=w.id
      }
      println("活著的wpoker數量:"+workers.size)
    }
  }
}
//老大
object  Master {
  def main(args: Array[String]): Unit = {
    val host=args(0)
    val port=args(1).toInt
    //準備配置檔案
    val config =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
     """.stripMargin
    val cfg =  ConfigFactory.parseString(config)   //解析配置檔案
   //actorSystem :Actor的領導,監控所有的actor,singletong
    val actorSystem =ActorSystem("MasterSystem",cfg)
    //建立actor
   val master = actorSystem.actorOf(Props[Master],name="Master")
    master ! "success"
    actorSystem.awaitTermination()  //讓程序等待,不結束

  }
}

4. Worker類

package com.zhiyou100.ScalaActor_akka.Worker

import java.util.UUID
import  scala.concurrent.duration._
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class Worker (val masterHost:String,val masterPort: Int,val mem:Int,val cores:Int)
                                            extends Actor{
  var master:ActorSelection=_
  val workerID=UUID.randomUUID().toString
  val HEART_BEAT=2000
  //建立連線
  override def preStart(): Unit = {
    //引數需要有/user/Master
   master= context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")
  // master ! "connect"
    //向Master傳送註冊訊息,Master的receive方法中的case class去接受
    master ! RegisterWorker(workerID,mem,cores)
  }

  override def receive : Receive ={
    //worker收到master返回的確認訊息
    case RegisterFinish (masterURL) =>{
      println(masterURL)
      //定時傳送心跳
      //匯入影視轉換
       import context.dispatcher
      context.system.scheduler.schedule(0.millis,HEART_BEAT.millis,self,SendHeartBeat)

      //schedule(0.millis ,HEART_BEAT.millis,self,SendHeartBeat)
    }
 //接收自己傳送給自己的心跳,然後在傳送給Master

    case SendHeartBeat=>{
      println("傳送心跳的Master")
      master ! HeartBeat(workerID)
    }
    case "reply" =>{
      println("haode")
    }
  }
}
  object Worker{
    def main(args: Array[String]): Unit = {
      val host=args(0)
      val port=args(1)
      val masterHost=args(2)
      val masterPort=args(3).toInt
      val mem =args(4).toInt
      val cores=args(5).toInt
      //準備配置
      val config=
        s"""
          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.netty.tcp.hostname = "$host"
          |akka.remote.netty.tcp.port = "$port"
        """.stripMargin
      val cfg =  ConfigFactory.parseString(config)
      val actorSystem =  ActorSystem("WorkerSystem",cfg)
      //建立worker
      actorSystem.actorOf(Props(new Worker(masterHost,masterPort,mem,cores)),name = "Worker")
      actorSystem.awaitTermination()
    }
  }