1. 程式人生 > >使用scala實現簡單的rpc案例

使用scala實現簡單的rpc案例

題目:使用scala的actor構建一個簡單的RPC呼叫例項
模仿ResourceManager和NodeManager之間的互動,
1、NodeManager向ResourceManager進行註冊(傳遞的引數是:主機名、記憶體、CPU、埠)
2、ResourceManager將接收到的訊息進行訊息匹配,如果是註冊訊息,將訊息儲存
3、傳送註冊請求的response給NodeManager(傳遞hostsname)
4、NodeManager接收到訊息,訊息匹配(註冊完成訊息)
5、NodeManager傳送心跳訊息給ResourceManager(每隔3秒鐘傳送一次)(傳送nodemanagerId)
6、ResourceManager接收到NodeManager的心跳訊息之後將心跳訊息儲存起來(包含提交的時間,要實時更新)
7、ResourceManager端每隔3秒鐘掃描一次心跳列表,如果當前時間減去節點註冊心跳時間大於10秒,則從列表刪除

分析:
1、首先無論是ResourceManager還是NodeManager在我們的需求中都是通過actor進行互動的
2、構建ResourceManger和NodeManager,並測試簡單的通訊
3、在NodeManager中主要完成註冊和定時傳送心跳兩個功能,ResourceManager中完成註冊資訊和心跳資訊的儲存
以及相應NodeManager請求和宕機節點資訊的清除的功能
    a、首先我們通過模式匹配來完成呼叫對方方法的功能
    b、我們將註冊和心跳都使用模式匹配類來傳遞資料
    c、定義case class RegisterNodeManager來完成NodeManager到ResourceManager註冊的任務
    d、定義case class Heartbeat來完成NodeManager向ResourceManager提交心跳的任務
    e、定義case class RegisteredNodeManager來完成ResourceManager向Nodemanager相應的任務
    f、在ResourceManager端使用class NodeManagerInfo儲存NodeManager的資訊
    g、在ResourceManager端使用 case object CheckTimeOut來做宕機校驗的模式匹配,是單例的
    h、在NodeManager端使用case object SendMessage來做心跳訊息傳送的模式匹配
    i、對於定時呼叫,使用的是:
        context.system.scheduler.schedule(initialDelay,interval,receiver,message)
            *initialDelay:  FiniteDuration, 多久以後開始執行
            * interval:     FiniteDuration, 每隔多長時間執行一次
            * receiver:     ActorRef, 給誰傳送這個訊息
            * message:      Any  傳送的訊息是啥
    j、注意將硬性變數提取出來。
測試簡單通訊:先啟動resourceManager然後在啟動nodemanager。

簡單的測試:

ResourceManager:

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class ResourceManager(var hostname:String,var port:Int) extends Actor{


  override def preStart(): Unit = {

  }

  override def receive:Receive = {
    case "hello" => {
      println("received nodemanager's message")
      sender() ! "hi"
    }

  }


}

object ResourceManager{

  def main(args: Array[String]): Unit = {
    var str =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =localhost
        |akka.remote.netty.tcp.port=19999
      """.stripMargin
    //通過字串解析建立config
    val config = ConfigFactory.parseString(str)

    //因為要使用actor,所以先構建actorSystem
    val ractorSystem: ActorSystem = ActorSystem("ResourceManagerActorSystem", config)

    //構建resourceManger的actor進行通訊
    ractorSystem.actorOf(Props(new ResourceManager("localhost",19999)),"ResourceManagerActor")

  }
}
NodeManager:
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class NodeManager(var resourceManagerHostName:String,
                  var resourceManagerPort:Int,
                  var memory:Int,var cpu:Int) extends Actor{


  override def preStart(): Unit = {
    var rmRef = context.actorSelection(s"akka.tcp://[email protected]:19999/user/ResourceManagerActor")
    rmRef ! "hello"
  }

  override def  receive:Receive = {
    case "hi" =>{
      println("received resourceManager's response")

    }
  }
}

object NodeManager{

  def main(args: Array[String]): Unit = {
    var str =
      """
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =localhost
        |akka.remote.netty.tcp.port=19997
      """.stripMargin
    val config = ConfigFactory.parseString(str)
    val nactorSystem = ActorSystem("NodeManagerActorSystem",config)
    nactorSystem.actorOf(Props(new NodeManager("localhost",19999,120,32)),"NodeManagerActor")
  }
}
結果:在兩個類的控制檯打印出定義的兩句話即可。

接下來將硬性變數提取出來做成常量,並建立模式匹配工具類和物件。

Constant:

object Constant {
    val RMAS = "ResourceManagerActorSystem"
    val RMA = "ResourceManagerActor"
    val NMAS = "NodeManagerActorSystem"
    val NMA = "NodeManagerActor"
}
Message
//匹配NodeManager向ResourceManager註冊資訊
case class RegisterNodeManager(var nodeManagerId:String,var memory:Int,var cpu:Int)
//匹配ResourceManager響應NodeManager的請求
case class RegisteredNodeManager(var resourceManagerHostName:String)
//匹配NodeManager向ResourceManager傳送的心跳資訊
case class HeartBeat(var nodeManagerId:String)

//用於resourceManager中儲存nodeManager的資訊
class NodeManagerInfo(var nodeManagerId:String,var memory:Int,var cpu:Int)

//匹配宕機校驗
case object CheckTimeOut
//匹配心跳傳送
case object SendMessage
ResourceManager:
package cn.zhao.lianxi

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable

class ResourceManager(var hostname:String,var port:Int) extends Actor{
  //可變的集合
  private var id2nodeManagerInfo = new mutable.HashMap[String,NodeManagerInfo]()
  private var nodeManagerInfoes = new mutable.HashSet[NodeManagerInfo]()


  override def preStart(): Unit = {
      import scala.concurrent.duration._
      import context.dispatcher  //目前不知道為什麼使用
      //一開始就進行檢查
      context.system.scheduler.schedule(0 millis, 5000 millis, self, CheckTimeOut)
  }



  override def receive:Receive = {
    //通過偏函式的模式匹配 進行處理NodeManager的註冊請求
    case RegisterNodeManager(nodeManagerId,memory,cpu) => {

      //將NodeManager註冊時傳遞過來的引數封裝成類
      val nodeManagerInfo = new NodeManagerInfo(nodeManagerId,memory,cpu)
      //將封裝好的NodeManager的資訊儲存在hashmap和hashset中
      //hashmap方便重複提交心跳時儲存
      id2nodeManagerInfo.put(nodeManagerId,nodeManagerInfo)
      //hashset方便checkTimeOut時遍歷 而且set會覆蓋舊的記錄
      nodeManagerInfoes += nodeManagerInfo


      //響應nodemanager的註冊請求  返回resourceManager的hostname和埠的組合
      sender() ! RegisteredNodeManager(hostname+":"+port)
    }

    case HeartBeat(nodeManagerId) => {
      val currentTime = System.currentTimeMillis()
      val nodeManagerInfo = id2nodeManagerInfo(nodeManagerId)
      //為了後面的宕機校驗 這裡要在心跳註冊新增最新的時間
      //這就要求nodeManagerInfo中含有類似time,這裡命名為lastHeartBeatTime
      nodeManagerInfo.lastHeartBeatTime = currentTime
      //本來不用再次儲存 因為這裡使用的是引用
      //但是為了清晰易懂還是在手動儲存
      id2nodeManagerInfo(nodeManagerId)=nodeManagerInfo
      //在hashSet中進行更新
      nodeManagerInfoes += nodeManagerInfo
    }

    //檢驗是不是過期
    case CheckTimeOut => {
      val currentTime = System.currentTimeMillis()
      nodeManagerInfoes.filter(nm => currentTime - nm.lastHeartBeatTime > 10000)
        .foreach(downed => {
          //從hashMap和hashSet中移除宕機的NodeManager的註冊資訊
          nodeManagerInfoes -= downed
          id2nodeManagerInfo.remove(downed.nodeManagerId)

        })
      println("健康的節點數量:"+nodeManagerInfoes.size)
    }

  }


}

object ResourceManager{
  def main(args: Array[String]): Unit = {
    val RESOURCEMANAGER_HOSTNAME = args(0)
    val RESOURCEMANAGER_PORT = args(1).toInt
    var str =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =$RESOURCEMANAGER_HOSTNAME
        |akka.remote.netty.tcp.port=$RESOURCEMANAGER_PORT
      """.stripMargin
    //通過字串解析建立config
    val config = ConfigFactory.parseString(str)

    //因為要使用actor,所以先構建actorSystem
    val ractorSystem: ActorSystem = ActorSystem(Constant.RMAS, config)

    //構建resourceManger的actor進行通訊
    ractorSystem.actorOf(Props(new ResourceManager(RESOURCEMANAGER_HOSTNAME,RESOURCEMANAGER_PORT)),Constant.RMA)

  }
}

NodeManager:

package cn.zhao.lianxi

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.language.postfixOps

//resourceManagerHostName ==> rmHostName
//resourceManagerPort ==> rmPort
class NodeManager(var rmHostName:String,
                  var rmPort:Int,
                  var memory:Int,var cpu:Int) extends Actor{

  //ActorSelection在伴生物件中可以看出型別,而且我們也可以通過.var+tab鍵檢視
  // _ 是佔位符,可以自動賦值
  var rmRef:ActorSelection = _

  var nodeManagerId:String = _

  override def preStart(): Unit = {
    rmRef = context.actorSelection(s"akka.tcp://${Constant.RMAS}@${rmHostName}:${rmPort}/user/${Constant.RMA}")
    nodeManagerId = UUID.randomUUID().toString
    rmRef ! RegisterNodeManager(nodeManagerId,memory,cpu)
  }

  override def  receive:Receive = {
    //通過模式匹配獲取resourceManager返回的註冊請求的響應
    case RegisteredNodeManager(resourceManagerHostName) =>{


      println(resourceManagerHostName)

      //傳送心跳
      //根據原始碼中得到首先給自己傳送一次心跳(原因目前不知,以後補上)
      import scala.concurrent.duration._
      import context.dispatcher
      //
      context.system.scheduler.schedule(0 millis,4000 millis,self,SendMessage)
    }

    case SendMessage =>{
      import scala.concurrent.duration._
      import context.dispatcher
      //rmRef要提升為全域性變數,因為給自己傳送了一次 所以不提升代表的就是本身
      //將nodeManageId提升為全域性變數 在這裡才可以使用
      rmRef ! HeartBeat(nodeManagerId)

    }
  }
}

object NodeManager{

  def main(args: Array[String]): Unit = {
    val RESOURCEMANAGER_HOSRNAME = args(0)
    val RESOURCEMANAGER_PORT = args(1).toInt
    val NODEMANAGER_HOSTNAME = args(2)
    val NODEMANAGER_PORT = args(3).toInt
    val RESOURCEMANAGER_MEMORY = args(4).toInt
    val RESOURCEMANAGER_CPU = args(5).toInt
    var str =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname =$NODEMANAGER_HOSTNAME
        |akka.remote.netty.tcp.port=$NODEMANAGER_PORT
      """.stripMargin
    val config = ConfigFactory.parseString(str)
    val nactorSystem = ActorSystem(Constant.NMAS,config)
    nactorSystem.actorOf(Props(new NodeManager(RESOURCEMANAGER_HOSRNAME,RESOURCEMANAGER_PORT,
                                                     RESOURCEMANAGER_MEMORY,RESOURCEMANAGER_CPU)),Constant.NMA)
  }
}