使用scala實現簡單的rpc案例
題目:使用scala的actor構建一個簡單的RPC呼叫例項
模仿ResourceManager和NodeManager之間的互動,
1、NodeManager向ResourceManager進行註冊(傳遞的引數是:主機名、記憶體、CPU、埠)
2、ResourceManager將接收到的訊息進行訊息匹配,如果是註冊訊息,將訊息儲存
3、傳送註冊請求的response給NodeManager(傳遞hostsname)
4、NodeManager接收到訊息,訊息匹配(註冊完成訊息)
5、NodeManager傳送心跳訊息給ResourceManager(每隔3秒鐘傳送一次)(傳送nodemanagerId)
6、ResourceManager接收到NodeManager的心跳訊息之後將心跳訊息儲存起來(包含提交的時間,要實時更新)
7、ResourceManager端每隔3秒鐘掃描一次心跳列表,如果當前時間減去節點註冊心跳時間大於10秒,則從列表刪除
分析:
1、首先無論是ResourceManager還是NodeManager在我們的需求中都是通過actor進行互動的
2、構建ResourceManger和NodeManager,並測試簡單的通訊
3、在NodeManager中主要完成註冊和定時傳送心跳兩個功能,ResourceManager中完成註冊資訊和心跳資訊的儲存
以及相應NodeManager請求和宕機節點資訊的清除的功能
a、首先我們通過模式匹配來完成呼叫對方方法的功能
b、我們將註冊和心跳都使用模式匹配類來傳遞資料
c、定義case class RegisterNodeManager來完成NodeManager到ResourceManager註冊的任務
d、定義case class Heartbeat來完成NodeManager向ResourceManager提交心跳的任務
e、定義case class RegisteredNodeManager來完成ResourceManager向Nodemanager相應的任務
f、在ResourceManager端使用class NodeManagerInfo儲存NodeManager的資訊
g、在ResourceManager端使用 case object CheckTimeOut來做宕機校驗的模式匹配,是單例的
h、在NodeManager端使用case object SendMessage來做心跳訊息傳送的模式匹配
i、對於定時呼叫,使用的是:
context.system.scheduler.schedule(initialDelay,interval,receiver,message)
*initialDelay: FiniteDuration, 多久以後開始執行
* interval: FiniteDuration, 每隔多長時間執行一次
* receiver: ActorRef, 給誰傳送這個訊息
* message: Any 傳送的訊息是啥
j、注意將硬性變數提取出來。
測試簡單通訊:先啟動resourceManager然後在啟動nodemanager。
簡單的測試:
ResourceManager:
NodeManager:import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory class ResourceManager(var hostname:String,var port:Int) extends Actor{ override def preStart(): Unit = { } override def receive:Receive = { case "hello" => { println("received nodemanager's message") sender() ! "hi" } } } object ResourceManager{ def main(args: Array[String]): Unit = { var str = """ |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname =localhost |akka.remote.netty.tcp.port=19999 """.stripMargin //通過字串解析建立config val config = ConfigFactory.parseString(str) //因為要使用actor,所以先構建actorSystem val ractorSystem: ActorSystem = ActorSystem("ResourceManagerActorSystem", config) //構建resourceManger的actor進行通訊 ractorSystem.actorOf(Props(new ResourceManager("localhost",19999)),"ResourceManagerActor") } }
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
class NodeManager(var resourceManagerHostName:String,
var resourceManagerPort:Int,
var memory:Int,var cpu:Int) extends Actor{
override def preStart(): Unit = {
var rmRef = context.actorSelection(s"akka.tcp://[email protected]:19999/user/ResourceManagerActor")
rmRef ! "hello"
}
override def receive:Receive = {
case "hi" =>{
println("received resourceManager's response")
}
}
}
object NodeManager{
def main(args: Array[String]): Unit = {
var str =
"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname =localhost
|akka.remote.netty.tcp.port=19997
""".stripMargin
val config = ConfigFactory.parseString(str)
val nactorSystem = ActorSystem("NodeManagerActorSystem",config)
nactorSystem.actorOf(Props(new NodeManager("localhost",19999,120,32)),"NodeManagerActor")
}
}
結果:在兩個類的控制檯打印出定義的兩句話即可。接下來將硬性變數提取出來做成常量,並建立模式匹配工具類和物件。
Constant:
object Constant {
val RMAS = "ResourceManagerActorSystem"
val RMA = "ResourceManagerActor"
val NMAS = "NodeManagerActorSystem"
val NMA = "NodeManagerActor"
}
Message//匹配NodeManager向ResourceManager註冊資訊
case class RegisterNodeManager(var nodeManagerId:String,var memory:Int,var cpu:Int)
//匹配ResourceManager響應NodeManager的請求
case class RegisteredNodeManager(var resourceManagerHostName:String)
//匹配NodeManager向ResourceManager傳送的心跳資訊
case class HeartBeat(var nodeManagerId:String)
//用於resourceManager中儲存nodeManager的資訊
class NodeManagerInfo(var nodeManagerId:String,var memory:Int,var cpu:Int)
//匹配宕機校驗
case object CheckTimeOut
//匹配心跳傳送
case object SendMessage
ResourceManager:package cn.zhao.lianxi
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
class ResourceManager(var hostname:String,var port:Int) extends Actor{
//可變的集合
private var id2nodeManagerInfo = new mutable.HashMap[String,NodeManagerInfo]()
private var nodeManagerInfoes = new mutable.HashSet[NodeManagerInfo]()
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher //目前不知道為什麼使用
//一開始就進行檢查
context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOut)
}
override def receive:Receive = {
//通過偏函式的模式匹配 進行處理NodeManager的註冊請求
case RegisterNodeManager(nodeManagerId,memory,cpu) => {
//將NodeManager註冊時傳遞過來的引數封裝成類
val nodeManagerInfo = new NodeManagerInfo(nodeManagerId,memory,cpu)
//將封裝好的NodeManager的資訊儲存在hashmap和hashset中
//hashmap方便重複提交心跳時儲存
id2nodeManagerInfo.put(nodeManagerId,nodeManagerInfo)
//hashset方便checkTimeOut時遍歷 而且set會覆蓋舊的記錄
nodeManagerInfoes += nodeManagerInfo
//響應nodemanager的註冊請求 返回resourceManager的hostname和埠的組合
sender() ! RegisteredNodeManager(hostname+":"+port)
}
case HeartBeat(nodeManagerId) => {
val currentTime = System.currentTimeMillis()
val nodeManagerInfo = id2nodeManagerInfo(nodeManagerId)
//為了後面的宕機校驗 這裡要在心跳註冊新增最新的時間
//這就要求nodeManagerInfo中含有類似time,這裡命名為lastHeartBeatTime
nodeManagerInfo.lastHeartBeatTime = currentTime
//本來不用再次儲存 因為這裡使用的是引用
//但是為了清晰易懂還是在手動儲存
id2nodeManagerInfo(nodeManagerId)=nodeManagerInfo
//在hashSet中進行更新
nodeManagerInfoes += nodeManagerInfo
}
//檢驗是不是過期
case CheckTimeOut => {
val currentTime = System.currentTimeMillis()
nodeManagerInfoes.filter(nm => currentTime - nm.lastHeartBeatTime > 10000)
.foreach(downed => {
//從hashMap和hashSet中移除宕機的NodeManager的註冊資訊
nodeManagerInfoes -= downed
id2nodeManagerInfo.remove(downed.nodeManagerId)
})
println("健康的節點數量:"+nodeManagerInfoes.size)
}
}
}
object ResourceManager{
def main(args: Array[String]): Unit = {
val RESOURCEMANAGER_HOSTNAME = args(0)
val RESOURCEMANAGER_PORT = args(1).toInt
var str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname =$RESOURCEMANAGER_HOSTNAME
|akka.remote.netty.tcp.port=$RESOURCEMANAGER_PORT
""".stripMargin
//通過字串解析建立config
val config = ConfigFactory.parseString(str)
//因為要使用actor,所以先構建actorSystem
val ractorSystem: ActorSystem = ActorSystem(Constant.RMAS, config)
//構建resourceManger的actor進行通訊
ractorSystem.actorOf(Props(new ResourceManager(RESOURCEMANAGER_HOSTNAME,RESOURCEMANAGER_PORT)),Constant.RMA)
}
}
NodeManager:
package cn.zhao.lianxi
import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.language.postfixOps
//resourceManagerHostName ==> rmHostName
//resourceManagerPort ==> rmPort
class NodeManager(var rmHostName:String,
var rmPort:Int,
var memory:Int,var cpu:Int) extends Actor{
//ActorSelection在伴生物件中可以看出型別,而且我們也可以通過.var+tab鍵檢視
// _ 是佔位符,可以自動賦值
var rmRef:ActorSelection = _
var nodeManagerId:String = _
override def preStart(): Unit = {
rmRef = context.actorSelection(s"akka.tcp://${Constant.RMAS}@${rmHostName}:${rmPort}/user/${Constant.RMA}")
nodeManagerId = UUID.randomUUID().toString
rmRef ! RegisterNodeManager(nodeManagerId,memory,cpu)
}
override def receive:Receive = {
//通過模式匹配獲取resourceManager返回的註冊請求的響應
case RegisteredNodeManager(resourceManagerHostName) =>{
println(resourceManagerHostName)
//傳送心跳
//根據原始碼中得到首先給自己傳送一次心跳(原因目前不知,以後補上)
import scala.concurrent.duration._
import context.dispatcher
//
context.system.scheduler.schedule(0 millis,4000 millis,self,SendMessage)
}
case SendMessage =>{
import scala.concurrent.duration._
import context.dispatcher
//rmRef要提升為全域性變數,因為給自己傳送了一次 所以不提升代表的就是本身
//將nodeManageId提升為全域性變數 在這裡才可以使用
rmRef ! HeartBeat(nodeManagerId)
}
}
}
object NodeManager{
def main(args: Array[String]): Unit = {
val RESOURCEMANAGER_HOSRNAME = args(0)
val RESOURCEMANAGER_PORT = args(1).toInt
val NODEMANAGER_HOSTNAME = args(2)
val NODEMANAGER_PORT = args(3).toInt
val RESOURCEMANAGER_MEMORY = args(4).toInt
val RESOURCEMANAGER_CPU = args(5).toInt
var str =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname =$NODEMANAGER_HOSTNAME
|akka.remote.netty.tcp.port=$NODEMANAGER_PORT
""".stripMargin
val config = ConfigFactory.parseString(str)
val nactorSystem = ActorSystem(Constant.NMAS,config)
nactorSystem.actorOf(Props(new NodeManager(RESOURCEMANAGER_HOSRNAME,RESOURCEMANAGER_PORT,
RESOURCEMANAGER_MEMORY,RESOURCEMANAGER_CPU)),Constant.NMA)
}
}