1. 程式人生 > >Scala之——程式設計實戰

Scala之——程式設計實戰

1. 專案概述

1.1.需求

目前大多數的分散式架構底層通訊都是通過 RPC 實現的, RPC 框架非常多,比如前我們學過的 Hadoop 專案的 RPC 通訊框架,但是 Hadoop 在設計之初就是為了執行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有 Hadoop RPC 顯得有些笨重。

Spark RPC 是通過 Akka 類庫實現的, Akka Scala 語言開發,基於 Actor 併發模型實現,Akka 具有高可靠、高效能、可擴充套件等特點,使用 Akka 可以輕鬆實現分散式 RPC 功能。

1.2. Akka 簡介

Akka 基於 Actor 模型,提供了一個用於構建可擴充套件的(

Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。
Actor 模型:在電腦科學領域, Actor 模型是一個平行計算(Concurrent Computation)模型,它把 actor 作為平行計算的基本元素來對待:為響應一個接收到的訊息,一個 actor 能夠自己做出一些決策,如建立更多的 actor,或傳送更多的訊息,或者確定如何去響應接收到的下一個訊息。


Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的物件,Actor之間可以通過交換訊息的方式進行通訊,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及執行緒管理,可以非常容易地開發出正確地併發程式和並行系統,Actor具有如下特性:

  • 提供了一種高階抽象,能夠簡化在併發(Concurrency)/並行(Parallelism)應用場景下的程式設計開發
  • 提供了非同步非阻塞的、高效能的事件驅動程式設計模型
  • 超級輕量級事件處理(每GB堆記憶體幾百萬Actor)

2. 專案實現

2.1.架構圖


2.2.重要類介紹

2.2.1. ActorSystem

Akka 中, ActorSystem 是一個重量級的結構,他需要分配多個執行緒,所以在實際應用中,ActorSystem 通常是一個單例物件,我們可以使用這個 ActorSystem 建立很多 Actor

2.2.2. Actor

Akka 中, Actor 負責通訊,在 Actor 中有一些重要的生命週期方法。

  • preStart()方法:該方法在 Actor 物件構造方法執行後執行,整個 Actor 生命週期中僅執行一次。
  • receive()方法:該方法在 Actor preStart 方法執行完成後執行,用於接收訊息,會被反覆執行。

2.3. Master

package com.lyz.scala
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

/**
  * Master為整個叢集中的主節點
  * Master繼承了Actor
  * @author liuyazhuang
  */
class Master extends Actor{

  //儲存WorkerID和Work資訊的map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //儲存所有Worker資訊的Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker超時時間
  val WORKER_TIMEOUT = 10 * 1000
  //重新receive方法

  //匯入隱式轉換,用於啟動定時器
  import context.dispatcher

  //構造方法執行完執行一次
  override def preStart(): Unit = {
    //啟動定時器,定時執行
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }

  //該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
  override def receive: Receive = {
    //Worker向Master傳送的註冊訊息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        sender ! RegisteredWorker("192.168.10.1")
      }
    }

    //Worker向Master傳送的心跳訊息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }

    //Master自己向自己傳送的定期檢查超時Worker的訊息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("worker size: " + workers.size)
    }
  }
}

object Master {
  //程式執行入口
  def main(args: Array[String]) {

    val host = "192.168.10.1"
    val port = 8888
    //建立ActorSystem的必要引數
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem是單例的,用來建立Actor
    val actorSystem = ActorSystem.create("MasterActorSystem", config)
    //啟動Actor,Master會被例項化,生命週期方法會被呼叫
    actorSystem.actorOf(Props[Master], "Master")
  }
}

2.4. Worker

package com.lyz.scala

import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

/**
  * Worker為整個叢集的從節點
  * Worker繼承了Actor
  * @author liuyazhuang
  */
class Worker extends Actor{

  //Worker端持有Master端的引用(代理物件)
  var master: ActorSelection = null
  //生成一個UUID,作為Worker的標識
  val id = UUID.randomUUID().toString

  //構造方法執行完執行一次
  override def preStart(): Unit = {
    //Worker向MasterActorSystem傳送建立連線請求
    master = context.system.actorSelection("akka.tcp://[email protected]:8888/user/Master")
    //Worker向Master傳送註冊訊息
    master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
  }

  //該方法會被反覆執行,用於接收訊息,通過case class模式匹配接收訊息
  override def receive: Receive = {
    //Master向Worker的反饋資訊
    case RegisteredWorker(masterUrl) => {
      import context.dispatcher
      //啟動定時任務,向Master傳送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }

    case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val clientPort = 2552
    //建立WorkerActorSystem的必要引數
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //啟動Actor,Master會被例項化,生命週期方法會被呼叫
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}