Scala之——程式設計實戰
阿新 • • 發佈:2018-12-25
1. 專案概述
1.1.需求
目前大多數的分散式架構底層通訊都是通過 RPC 實現的, RPC 框架非常多,比如前我們學過的 Hadoop 專案的 RPC 通訊框架,但是 Hadoop 在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有 Hadoop 的 RPC 顯得有些笨重。
Spark 的 RPC 是通過 Akka 類庫實現的, Akka 用 Scala 語言開發,基於 Actor 併發模型實現,Akka 具有高可靠、高效能、可擴充套件等特點,使用 Akka 可以輕鬆實現分散式 RPC 功能。
1.2. Akka 簡介
Akka 基於 Actor 模型,提供了一個用於構建可擴充套件的(
Actor 模型:在電腦科學領域, Actor 模型是一個平行計算(Concurrent Computation)模型,它把 actor 作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個 actor 能夠自己做出一些決策,如建立更多的 actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。
Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:
- 提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發
- 提供了非同步非阻塞的、高效能的事件驅動程式設計模型
- 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)
2. 專案實現
2.1.架構圖
2.2.重要類介紹
2.2.1. ActorSystem
在 Akka 中, ActorSystem 是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中,ActorSystem 通常是一個單例物件,我們可以使用這個 ActorSystem 建立很多 Actor。
2.2.2. Actor
在 Akka 中, Actor 負責通訊,在 Actor 中有一些重要的生命週期方法。
- preStart()方法:該方法在 Actor 物件構造方法執行後執行,整個 Actor 生命週期中僅執行一次。
- receive()方法:該方法在 Actor 的 preStart 方法執行完成後執行,用於接收訊息,會被反覆執行。
2.3. Master 類
package com.lyz.scala
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
/**
* Master為整個叢集中的主節點
* Master繼承了Actor
* @author liuyazhuang
*/
class Master extends Actor{
//儲存WorkerID和Work資訊的map
val idToWorker = new mutable.HashMap[String, WorkerInfo]
//儲存所有Worker資訊的Set
val workers = new mutable.HashSet[WorkerInfo]
//Worker超時時間
val WORKER_TIMEOUT = 10 * 1000
//重新receive方法
//匯入隱式轉換,用於啟動定時器
import context.dispatcher
//構造方法執行完執行一次
override def preStart(): Unit = {
//啟動定時器,定時執行
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
}
//該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
override def receive: Receive = {
//Worker向Master傳送的註冊訊息
case RegisterWorker(id, workerHost, memory, cores) => {
if(!idToWorker.contains(id)) {
val worker = new WorkerInfo(id, workerHost, memory, cores)
workers.add(worker)
idToWorker(id) = worker
sender ! RegisteredWorker("192.168.10.1")
}
}
//Worker向Master傳送的心跳訊息
case HeartBeat(workerId) => {
val workerInfo = idToWorker(workerId)
workerInfo.lastHeartbeat = System.currentTimeMillis()
}
//Master自己向自己傳送的定期檢查超時Worker的訊息
case CheckOfTimeOutWorker => {
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
for(worker <- toRemove){
workers -= worker
idToWorker.remove(worker.id)
}
println("worker size: " + workers.size)
}
}
}
object Master {
//程式執行入口
def main(args: Array[String]) {
val host = "192.168.10.1"
val port = 8888
//建立ActorSystem的必要引數
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem是單例的,用來建立Actor
val actorSystem = ActorSystem.create("MasterActorSystem", config)
//啟動Actor,Master會被例項化,生命週期方法會被呼叫
actorSystem.actorOf(Props[Master], "Master")
}
}
2.4. Worker 類
package com.lyz.scala
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
/**
* Worker為整個叢集的從節點
* Worker繼承了Actor
* @author liuyazhuang
*/
class Worker extends Actor{
//Worker端持有Master端的引用(代理物件)
var master: ActorSelection = null
//生成一個UUID,作為Worker的標識
val id = UUID.randomUUID().toString
//構造方法執行完執行一次
override def preStart(): Unit = {
//Worker向MasterActorSystem傳送建立連線請求
master = context.system.actorSelection("akka.tcp://[email protected]:8888/user/Master")
//Worker向Master傳送註冊訊息
master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
}
//該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
override def receive: Receive = {
//Master向Worker的反饋資訊
case RegisteredWorker(masterUrl) => {
import context.dispatcher
//啟動定時任務,向Master傳送心跳
context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
}
case SendHeartBeat => {
println("worker send heartbeat")
master ! HeartBeat(id)
}
}
}
object Worker {
def main(args: Array[String]) {
val clientPort = 2552
//建立WorkerActorSystem的必要引數
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.port = $clientPort
""".stripMargin
val config = ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("WorkerActorSystem", config)
//啟動Actor,Master會被例項化,生命週期方法會被呼叫
actorSystem.actorOf(Props[Worker], "Worker")
}
}