1. 程式人生 > >大資料知識:快學Scala(三)Scala程式設計實戰

大資料知識:快學Scala(三)Scala程式設計實戰

簡介

        在學習完Scala語言後,我們可以實現一個簡單的RPC來鞏固前面我們學習的Scala知識點,這裡主要的知識點涉及樣例類、模式匹配、類繼承、隱式轉化以及函式和方法,如果需要原始碼可以在我的Github上下載,地址是SRPC

專案概述

需求

        目前大多數的分散式架構底層通訊都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop專案的RPC通訊框架,但是Hadoop在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。

        Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具有高可靠、高效能、可擴充套件等特點,使用Akka可以輕鬆實現分散式RPC功能。

Akka簡介

        Akka基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。

        Actor模型:在電腦科學領域,Actor模型是一個平行計算(Concurrent Computation)模型,它把actor作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個actor能夠自己做出一些決策,如建立更多的actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。


        Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:

         1.提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發

        2.提供了非同步非阻塞的、高效能的事件驅動程式設計模型

        3.超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

專案實現

架構圖

實現的主要思路

    該簡單的框架主要有兩個類Master、Worker,Master類主要用於Worke的註冊,定時的檢查節點是否存活,感知節點的上下線;Worker主要向Master註冊資訊,然後定期的排程任務。

程式碼實現

Master類的具體程式碼:
package cn.edu.hust

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._
import scala.collection.mutable

class Master(val  host:String,val port:Int) extends Actor{
  val works=new mutable.HashMap[String,WorkInfo]()
  val works_avaliable=new mutable.HashSet[WorkInfo]()
  val heartBeat_interval:Long=15000
  //在構造方法之後,receive方法之前啟動
  override def preStart(): Unit = {
      println("start method invoked")
      import context.dispatcher
      context.system.scheduler.schedule(0 millis ,heartBeat_interval millis,self,CheckHeartBeat)
  }

  def func(worker:WorkInfo):Boolean={
    System.currentTimeMillis()-worker.Pre_Time>heartBeat_interval
  }

  override def receive():Receive = {
    //worker向服務端註冊資訊
    case RegisterWorkInfo(id,memory,cores) =>{
      //如果沒有包含這個worker,那麼就將這個worker的資訊儲存在本地,然後傳送資訊
      if(!works.contains(id))
        {
          val worker_new=new WorkInfo(id,memory,cores)
          works.put(id,worker_new)
          works_avaliable+=worker_new
        }
      //傳送訊息給worker,如果註冊成功返回
      sender ! RegisterResponse(s"akka.tcp://
[email protected]
$host:$port/user/Master") } case HeartBeatToMaster(id) => { if(works.contains(id)) works(id).Pre_Time=System.currentTimeMillis() //works_avaliable. //sender() ! "ok" } case CheckHeartBeat=> { //val func=(worker:WorkInfo)=>Boolean {} val toRemove=works_avaliable.filter(func) for(info <- toRemove) { works -= info.id works_avaliable -= info } println(works.size) } } } object Master { def main(args: Array[String]): Unit = { val host=args(0) val port=args(1).toInt val conf=s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config=ConfigFactory.parseString(conf) //1.建立一個ActorSystem用於監控和管理所有的actor val actorSystem=ActorSystem.create("system",config) //建立一個actor,這裡表示Master val master=actorSystem.actorOf(Props(new Master(host,port)), "Master") //傳送非同步訊息 //master!"connect" //退出 actorSystem.awaitTermination() } }

Worker類的具體程式碼:

package cn.edu.hust

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

class Worker(val masterHost:String,val masterPort:Int) extends Actor{
  var master:ActorSelection= _
  //給每一個worker分配一個唯一的id
  val id=UUID.randomUUID().toString
  val beats=10000
  override def preStart(): Unit = {
    //1.連線server,獲取master例項
    master=context.actorSelection(s"akka.tcp://[email protected]$masterHost:$masterPort/user/Master")
    //2.使用master例項註冊訊息
    master! RegisterWorkInfo(id,10240,4)

  }

  override def receive ():Receive = {
    case RegisterResponse(url)=>{
      println(s"the master work in $url")
      import context.dispatcher
      //配置一個定時排程器,定時向master傳送任務
      context.system.scheduler.schedule(0 millis ,beats millis,self,HeartBeat)
    }
    case HeartBeat=>{
      master!HeartBeatToMaster(id)
    }
  }
}
object Worker
{
  def main(args: Array[String]): Unit = {
      val masterHost=args(0)
      val masterPort=args(1).toInt
      val workerHost=args(2)
      val workerPort=args(3).toInt
      val conf=s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
                   |akka.remote.netty.tcp.hostname = "$workerHost"
                   |akka.remote.netty.tcp.port = "$workerPort"
                 """.stripMargin
      val config=ConfigFactory.parseString(conf)
      val system=ActorSystem.create("worker",config)
      val worker=system.actorOf(Props(new Worker(masterHost,masterPort)),"Worker")
    system.awaitTermination()
  }
}

附帶的程式碼:

package cn.edu.hust

trait RemoteMessage extends Serializable
//從worker到master的訊息
case class RegisterWorkInfo(id:String,memory:Int,cores:Int) extends RemoteMessage
//master向worker傳送訊息
case class RegisterResponse(url:String) extends RemoteMessage
//master向worker傳送心跳訊息
case class HeartBeatToMaster(id:String)extends RemoteMessage
//worker自己向自己傳送心跳資訊
case object HeartBeat
case object CheckHeartBeat

package cn.edu.hust

class WorkInfo(val id:String,val memory:Int, val cores:Int) {
  //儲存上一次心跳通訊的時間
  var Pre_Time:Long=_
}