Scala程式設計實戰 RPC
阿新 • • 發佈:2019-01-09
1. 熟練使用Scala編寫程式
2. 專案概述
2.1 需求
目前大多數的分散式架構層通訊都是通過RPC實現的,RPC架構非常多,比如Hadoop專案的RPC通訊框架,但是Hadoo在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重 Spark的RPC是通過AKKa類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。
2.2 Akka簡介 Akka基於Actor模型,提供了一個用於構建可擴充套件(Scalable),彈性的(Resilient),快速響應的(Responsive)應用程式的平臺。 Actor模型:在電腦科學領域,Actor模型是一個平行計算(ConCurrent Computation)模型,它把actor做出一些決策,如建立更多的actor,或傳送更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。
Actor是一個Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每一個Actor都有自己的收件箱(MailBox)。通過Actor能夠簡化鎖及執行緒還禮,可以非常的開發出正確併發程式和並行系統,Actor具有如下特性:
1.提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(parallelism)應用場景下的程式設計開發
2. 提供了非同步非阻塞的,高效能的時間驅動程式設計模型
3.超級了輕量級事件處理(每GB堆記憶體幾百萬Actor)
3. 專案實現
3.1 架構圖
3.2 重要類介紹
3.2.1 ActorSystem
在Akka中,ActorSystem是一個重要級的結構,需要分配多個執行緒,所以在實際應用中,ActorSystem通常是一個單例物件,可以使用這個ActorSystem建立很多Actor.
3.2.2 Actor
在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法
1.preStart()方法:該方法Actor物件構造方法執行後執行,整個Actor生命週期中僅執行一次。
2.receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收訊息,會被返回執行。
1.WorkInfo類
package com.zhiyou100.ScalaActor_akka.Worker
/*用來封裝master接受的worker的資訊
@param id
@param mem
@param cores
*/
class WorkInfo(val id:String,val mem:Int,val cores:Int) {
var LastBeat :Long = _
}
2.IntervalMeaage類
package com.zhiyou100.ScalaActor_akka.Worker
/*互動訊息:用來封裝Master與Worker進行互動的訊息
1.註冊 2.Master的反饋 3.心跳 4.檢查資訊(時間超時)
這裡面都是樣例類,因為在網路傳輸所以必須實現序列化
*/
trait IntervalMeaage extends Serializable //抽象類
//註冊樣例類,封裝了從Worker到Master的註冊訊息
case class RegisterWorker(id:String,mem:Int,cores:Int) extends IntervalMeaage //mem 記憶體 core核數
//註冊成功確認訊息
case class RegisterFinish(masterURL:String) extends IntervalMeaage
//傳送心跳 worker -->worker
case object SendHeartBeat
//傳送心跳 worker -> master
case class HeartBeat (id:String) extends IntervalMeaage
//超時檢查 master -->master
case object CheckTimeOut
3.Master類
package com.zhiyou100.ScalaActor_akka.Worker
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import scala.concurrent.duration._
class Master extends Actor{
//用來儲存worker傳輸過來的資訊,value最好一個實體類,以後方便使用其中屬性
val ids=new mutable.HashMap[String,WorkInfo]()
//儲存map中的workInfo的集合,方便以後屬性的排序
val workers=new mutable.HashSet[WorkInfo]()
val CHECK_BEAT=2500
// 生命週期之啟動Actor contrl+o
override def preStart():Unit ={
println("Actor is preStart")
//啟動定時器,檢查worker心跳是否正常
import context.dispatcher
context.system.scheduler.schedule(0.millis,CHECK_BEAT.millis,self,CheckTimeOut)
}
//用於接收訊息
override def receive:Receive = {
case "connect" =>{
println("success")
sender ! "reply"
}
case "hello" =>{
println("1111" )
}
case RegisterWorker(id,mem,cores) =>{
//判斷一下worker是否註冊過
if (!ids.contains(id)){
//將封裝例項化
val workInfo = new WorkInfo(id,mem,cores)
//儲存資料策略: 1.儲存記憶體 2.持久化到磁碟 3. 儲存到zookeeper
//map的put的操作
ids (id) =workInfo
//set的追加操作
workers += workInfo
//建立樣例類傳送註冊成功確認訊息
sender ! RegisterFinish("akka.tcp://[email protected]$host:$port/user/Master")
}
}
case HeartBeat(id) =>{
if (ids.contains(id)){
val workInfo =ids(id)
val currentTime=System.currentTimeMillis()
//把心跳時間用當前的時間置換
workInfo.LastBeat =currentTime
}
}
case CheckTimeOut =>{
val concurrentTime=System.currentTimeMillis()
val toClean = workers.filter(x=> concurrentTime -x.LastBeat >CHECK_BEAT)
for (w<- toClean){
workers -=w
ids -=w.id
}
println("活著的wpoker數量:"+workers.size)
}
}
}
//老大
object Master {
def main(args: Array[String]): Unit = {
val host=args(0)
val port=args(1).toInt
//準備配置檔案
val config =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val cfg = ConfigFactory.parseString(config) //解析配置檔案
//actorSystem :Actor的領導,監控所有的actor,singletong
val actorSystem =ActorSystem("MasterSystem",cfg)
//建立actor
val master = actorSystem.actorOf(Props[Master],name="Master")
master ! "success"
actorSystem.awaitTermination() //讓程序等待,不結束
}
}
4. Worker類
package com.zhiyou100.ScalaActor_akka.Worker
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class Worker (val masterHost:String,val masterPort: Int,val mem:Int,val cores:Int)
extends Actor{
var master:ActorSelection=_
val workerID=UUID.randomUUID().toString
val HEART_BEAT=2000
//建立連線
override def preStart(): Unit = {
//引數需要有/user/Master
master= context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")
// master ! "connect"
//向Master傳送註冊訊息,Master的receive方法中的case class去接受
master ! RegisterWorker(workerID,mem,cores)
}
override def receive : Receive ={
//worker收到master返回的確認訊息
case RegisterFinish (masterURL) =>{
println(masterURL)
//定時傳送心跳
//匯入影視轉換
import context.dispatcher
context.system.scheduler.schedule(0.millis,HEART_BEAT.millis,self,SendHeartBeat)
//schedule(0.millis ,HEART_BEAT.millis,self,SendHeartBeat)
}
//接收自己傳送給自己的心跳,然後在傳送給Master
case SendHeartBeat=>{
println("傳送心跳的Master")
master ! HeartBeat(workerID)
}
case "reply" =>{
println("haode")
}
}
}
object Worker{
def main(args: Array[String]): Unit = {
val host=args(0)
val port=args(1)
val masterHost=args(2)
val masterPort=args(3).toInt
val mem =args(4).toInt
val cores=args(5).toInt
//準備配置
val config=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val cfg = ConfigFactory.parseString(config)
val actorSystem = ActorSystem("WorkerSystem",cfg)
//建立worker
actorSystem.actorOf(Props(new Worker(masterHost,masterPort,mem,cores)),name = "Worker")
actorSystem.awaitTermination()
}
}