scala 學習總結
Scala 學習備忘要點
一、scala 基礎
-
Scala 中所有的值都是有型別的,包括數值和函式
-
Scala 中 不需要帶 分號 ;
-
中括號代表 [ ] : 泛型
-
“+” ,“ - ” ,“ * ” ,“ / ” 在 Scala 裡面都只是 方法
-
定義方法:
def 方法名稱 (引數列表) 返回型別 方法體
def add(x:Int,y:Int) :Int = x+y def add(x:Int,y:Int)(z:Int) :Int = (x+y) * z // 多引數列表
-
定義函式
val function = (x:Int ,y:Int ) => Int = x+y
val f1 = ((a: Int, b: Int) => a + b) val f2 = (a: Int, b: Int) => a + b val f3 = (_: Int) + (_: Int) val f4: (Int, Int) => Int = (_ + _)
-
匯入可變屬性Map
import mutable.Map
-
神奇的下劃線 _
- 可以將方法變成函式
- 用於一般的表示式,表示 所有
-
陣列
- 不可變陣列Array
- 可變陣列 ArrayBuffer
-
除了 :+ , 其他的 .+: .::: 等 都會反過來
-
匯入可變的屬性可以用這個方法:
import scala.collection.mutable._
-
要記住HshSet 是無序分佈的,這個不是隨機的意思,以為他們每次打印出的順序是一樣的
-
一般情況下 += 什麼的用在可變的集合
++ ++: : : : 之類的一般用在不可變的集合,會生成新的集合
-
apply 方法 的呼叫情況瞭解一下,例子就是 array陣列
val arr = Array("ha","bi","ci") // 這裡不需要new一個Array出來,會自動呼叫apply方法
-
抽象的類裡面的抽象方法不需要再用 abstract 修飾了
-
scala 裡,單例物件的屬性欄位和方法都是靜態的
-
把類的class 修飾符 改成 object 就是第一個單例類,所有的方法也都是靜態的
-
伴生物件 和 他的半生類 之間的私有屬性 和 私有方法 都說可以互相呼叫的,公共的更是可以了,所以這兩基本就是互通的
-
類的構造器有主構造器和輔助構造器,在我們呼叫輔助構造器的時候 他會自動呼叫主構造器的東西
-
抽象類中的抽象方法不需要加abstract 關鍵字
-
在scala 中的方法體裡 可以在定義方法,但是 Java中 不可以
-
補充知識:修飾類的 可見性範圍 private [ 包名 ]
private [cn] class Animal
[ ] 裡的就是表示該類的可見性範圍,例如上面的是 在 cn.qphone… 在cn 包下可見
-
模式匹配元素,元組 (都是可以帶守衛的)
arr match { case "xx" => xx }
-
樣例類:用於模式匹配的, 樣例類可以不new,解析器會自動建立他的伴生物件和apply 方法
:樣例類是一種特殊的類,可用於模式匹配。case class是多例的,後面要跟構造引數,case object是單例的
case class Person(val name:String) //值得注意的是。單例類是不能再類名後加 () 的,本身即是主構造器 case object TimeOutTask val arr = Array(Person("Tom"),21,1000,TimeOutTask) //然後這裡面的case class,都不用new
-
Option 本身是介面(特質):子類有: some 和 none
:Option型別樣例類用來表示可能存在或也可能不存在的值(Option的子類有Some和None)。Some包裝了某個值,None表示沒有值
val names = Map(("tom",80),("jder",44)) val key = "tom" var score = names.get("tom") match { case Some(value) => value case None => 0 } // 等價於 score = names.getOrElse(key,0) // 如果是map 的話可以這樣用
-
偏函式:PartialFunction [A , B ] 裡的兩個引數,第一個是輸入的 引數型別 ,第二個是返回型別
:被包在花括號內沒有match的一組case語句是一個偏函式
def func1: PartialFunction[String, Int] = { case "one" => 1 case "two" => 2 case _ => -1 } // 等價於 def func2(num: String) : Int = num match { case "one" => 1 case "two" => 2 case _ => -1 }
二、Scala 的高階特性
1. 閉包
2.有名函式,匿名函式,方法轉換成函式
3.柯里化 (這個感覺用不到,挺雞肋的一東西,但是底層用的極多),這就是柯里化
// 定義一個奇怪的方法 (方法體裡面包含了一個匿名的函式)
def m(x:Int) = (y:Int) => x*y
// 簡化的寫法 ------------》》 我覺著可以寫成 def ma (x:int,y:Int) = x * y
def ma(x:Int)(y:Int) =x * y
println(m(1)(3))
println(ma(1)(3))
4.關於 區域性常量 和 函式定義 的一些內容, 不過作用範圍不知道是否相同
// 嚴格而言,f1 現在是區域性常量,型別是匿名函式型別 用 : 定義,函式的宣告不能寫引數名,要寫型別
val f1 :(Int) => Int = { x => x * 4}
// 簡化一下
val f1_1 :(Int) => Int = _ * 4
println(f1(1))
// 等價於 (這個是函式 用 = 定義,要寫引數名和型別)
val f2 = (x:Int) => x * 4
// 還可以等價於
val f3 = (_:Int) * 4
5.0 回顧Java中的 comparable 和 comparator
- 底層的資料結構是 紅黑二叉樹
5.1隱式轉換案例:ordering —> 相當於 comparator
- gril 的實體類
- 簡單的賦予其屬性
- 準備一個 比較器 的單例類,但是 implicit 修飾的單例物件不能作為頂級類,需要外套一個物件
- 重寫Ordering的 compare 的方法
- 準備一個選美的 類(比較顏值 )
- choose (隱式引數) 方法,選擇誰更美
- main方法: - 匯入隱式物件
- 準備兩個girl 例項
- 準備選美的例項,並呼叫其中的方法
- 顯示結果
===========================================================================================
- 在ordering 裡,觸發 隱式物件的時候是在 :order.gt(a ,b) 的時候會 觸發 比較器 Mycompare
- 利用隱式引數來將隱式物件傳進來,不過要求隱式物件是單例的,即 比較器 是單裡的
- 比較器 MyCompare 就是 Ordering 特質的例項
---- 原始碼如下
/**
* Description:scala中的的隱式引數傳遞演示
* 隱式引數:
* ①若一個方法的引數使用implicit來修飾,就是隱式引數
* ②呼叫包含隱式引數的方法時,不要顯示地傳遞引數值,前提:實現準備一個與隱式引數型別相同的例項
* ③呼叫時,例項會自動傳入到包含隱式引數的方法中
*/
//Girl實體類
class Girl(val name: String, val colorValue: Double) {
override def toString = s"Girl($name, $colorValue)"
}
//準備一個比較器的例項(此處使用Orderring方式,相當於Java中的Comparator)
object Outer {
//注意:使用implicit修飾的單例物件不能作為頂級類,物件,需要外套一個Object或是Class
implicit object MyComparator extends Ordering[Girl] {
override def compare(x: Girl, y: Girl): Int = (x.colorValue -y.colorValue).asInstanceOf[Int];
}
}
//準備一個類,用於選美,M:在執行中會被Girl型別來取代,應為指定了比較規則,此處不會編譯報錯。編譯器認為:Girl例項是可比較的
class SelectBeauty[M: Ordering](val girl1: M, val girl2: M) {
/**
* 選擇一個顏值高的girl
*
* 比較器中的比較規則觸發執行的時機是:
* order.gt(girl1, girl2)
*
* 下述方法在呼叫時, 不需要傳遞引數,前提:準備號一個隱式單例物件即可,執行時會自動傳入
* *
* 只能將隱式物件自動傳入到隱式引數中!!
*
* @param order
* @return
*/
def choose(implicit order: Ordering[M]): M = if (order.gt(girl1, girl2)) girl1 else girl2
}
object OderringWay extends App{
//前提:匯入隱式物件,就是Orderring 特質的例項(比較器的例項),就是MyComparator
import Outer._
//步驟:
//①準備兩個girl的例項
val my:Girl = new Girl("曼玉",78.89)
val yc:Girl = new Girl("宇春",70.99)
//②構建SelectBeauty的例項
val beauty = new SelectBeauty(yc,my)
//③呼叫例項中的選美方法,獲得要給顏值高的girl顯示
val highColorGirl:Girl = beauty.choose
//④顯示結果
println("高顏值的女該資訊是:\n"+highColorGirl)
}
5.2隱式轉換案例:ordered — > 相當於 comparable
- girl 的實體類
- 準備一個 object ,在裡面寫 隱式轉換函式(thisGirl:Girl):(每部要重寫compare(thatGirl:Girl)方法)
- 選美的類[ ] , 在類後面對[ T <% Ordered[T] ] 裡放了個 視界
-(解釋:A <% B 表示型別變數A 必須是 型別B` 的子類,或者A能夠隱式轉換到B)
- 這次裡面的方法不需要寫 隱式引數
- main - 同上
==========================================================================================
- 在ordered 這裡,觸發 隱式函式的時機是在 new girl 的時候
---- 原始碼如下
// Description:scala中的的隱式引數傳遞演示
//Girl實體類
class Girl(val name: String, val colorValue: Double) {
override def toString = s"Girl($name, $colorValue)"
}
object ImplicitChange {
/**
* 下述隱式轉換函式自動觸發執行的時機是:當new了一個Girl的例項時
*/
implicit def autoChangeGirlToOrdered(thisGirl: Girl) = new Ordered[Girl] {
//本質:new的是特質Ordered匿名的實現類的例項
/**
* 方法中傳入的當前girl的例項與別的例項進行比較
*/
override def compare(thatGirl: Girl): Int = (thisGirl.colorValue - thatGirl.colorValue).asInstanceOf[Int]
}
}
/**
* 選美的類
*/
//視界定義: A <% B ,表示型別變數A 必須是 型別B`的子類,或者A能夠隱式轉換到B
class SelectBeauty[T <% Ordered[T]](val girl1: T, val girl2: T) {
/**
* 選美 (注意:下述方法沒有使用隱式引數)
* girl1 > girl2會去呼叫compare方法
*/
def choose = if (girl1 > girl2) girl1 else girl2
}
object OderedWay extends App {
//前提:匯入隱式轉換方法(函式)
import ImplicitChange._
//步驟:
//①準備兩個girl的例項
val my: Girl = new Girl("曼玉", 50.9999)
val yc: Girl = new Girl("宇春", 60.45)
//②構建SelectBeauty的例項
val beauty = new SelectBeauty(my, yc)
//③呼叫例項中的選美方法,獲得要給顏值高的girl資訊
val highColorGirl: Girl = beauty.choose
//④顯示結果
println("~~>高顏值的女該資訊是:\n" + highColorGirl)
}
6. 泛型類
6.1 協變
定義一個型別List[+A],如果A是協變的,意思是:對型別A和B,A是B的子型別,那麼List[A]是List[B]的子型別。
6.2 逆變
定義一個型別Writer[-A],如果A是逆變的,意思是:對型別A和B,A是B的子型別,那麼Writer[B]是Writer[A]的子
型別。
6.3 上界
上界定義: T <: A
,表示型別變數T
必須是 型別A
子類
6.4 下界
語法 B >: A 表示引數型別或抽象型別 B 須是型別A的父類。通常,A是類的型別引數,B是方法的型別引數。
6.5 視界 (已過時)
視界定義: A <% B
,表示型別變數A 必須是 型別
B`的子類,或者A能夠隱式轉換到B
6.6 上下文界定
上下文界定的形式為 T : M, 其中M 必須為泛型類, 必須存在一個M[T]的隱式值.
三、Actor 程式設計
1 定義 :
基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的
2. Java 執行緒 與 actor 的區別
3. Actor 方法的執行順序
- start() 開始執行緒
- act() 方法線上程啟動之後會被執行
- 最後向Actor 傳送訊息
-傳送訊息的方式: !(非同步無返回值), !? (同步等待返回值) ,!! (非同步,返回值:Future[Any])
4. Actor 實戰要點
- 可以用單例物件繼承 Actor, 然後重寫 act()方法
- recive 和 react 都是偏函式,react 會複用執行緒,比recive 更加的高效
- loop 適合與 react 搭配著使用
5. apply 方法的作用:
//強行讓子執行緒現執行,若是不呼叫apply()方法,主執行緒並不會等待子執行緒返回訊息,會直接執行後面的程式碼
6. 執行緒的同步非同步訊息特點 ,內部運作流程
//步驟:
//準備執行緒的例項
val thread = new MyThread
//啟動執行緒
thread.start
//Ⅰ - 傳送非同步訊息 ,特點:主執行緒傳送完畢後,繼續執行後續的程式碼,主執行緒沒有阻塞
//向子執行緒傳送非同步訊息(沒有返回值)
thread ! AnsyMsg(1,"你好!請為我做xxx")
println("已經向子執行緒傳送完非同步訊息了。。。。")
println("\n________________________________________\n")
// Ⅱ - 1 主執行緒向子執行緒傳送同步訊息,訊息型別是: 樣例類 ,建議訊息型別多使用樣例類,
// 特點: 主執行緒將訊息傳送完畢之後,會處於阻塞狀態,等待子執行緒執行完畢,子執行緒執行完畢後,主執行緒才會執行後續的程式碼
val accpetMsg = thread !? AnsyMsg(1, "你好!請為我做xxx")
println(s"主執行緒收到了來自子執行緒反饋過來的訊息$accpetMsg")
// Ⅱ - 2 主執行緒向子執行緒傳送同步訊息,訊息的型別是 元組 (key:Long, value: AnsyMsg樣例類型別),
// 特點: 主執行緒將訊息傳送完畢之後,會處於阻塞狀態,等待子執行緒執行完畢,子執行緒執行完畢後,主執行緒才會執行後續的程式碼
//與上述的不同之處:若是子執行緒返回的資訊採用的是非同步傳送的方式,主執行緒有可能沒有收到結果,與上述樣例類的訊息不同
val accpetMsg = thread !? (2000L, AnsyMsg(1, "你好!請為我做xxx"))
Thread.sleep(3000)
println(s"主執行緒收到了來自子執行緒反饋過來的訊息$accpetMsg")
println("\n________________________________________\n")
// Ⅲ 主執行緒向子執行緒傳送帶返回值的非同步訊息
// 特點:若是主執行緒中沒有呼叫apply方法,傳送完畢後,繼續執行後續的程式碼,主執行緒沒有阻塞
// 若是主執行緒中訊息傳送完畢後,呼叫了apply方法,先執行子執行緒,子執行緒執行完畢之後,然後再執行主執行緒,若子執行緒有返回值,返回值就是apply方法的返回值。
val msg: Future[Any] = thread !! SyncMsg(444, "哥們你好,吃飽了?")
println(msg.isSet)
println(msg)
println("訊息已經發送完畢\n\n")
//apply: 強行讓子執行緒先執行完畢
val realRsult = msg.apply()
println(realRsult)
println(msg.isSet)
println(msg)
println("訊息已經發送完畢")
}
7. WordCount 案例 :
- 教科書
object ActorExercise {
def main(args: Array[String]): Unit = {
//準備目錄(或者是目錄下的多個檔案的路徑)
val files = Array("data/words.log", "data/words.txt", "data/words2.txt")
//準備一個ListBuffer可變集合,用於儲存所有檔案中單詞出現的總次數
val container: ListBuffer[MiddleResult] = ListBuffer[MiddleResult]()
//通過迴圈來計算各個檔案單詞出現的總次數
for (file <- files) {
//迴圈體:
//每迴圈一次,構建一個全新的執行緒例項,求出每一個檔案中單詞出現的總次數,並存入到ListBuffer集合中
//建立執行緒例項
val thread = new Task
thread.start()
//傳送帶返回值的非同步訊息
val fut = thread !! SubmitTask(file)
val perResult: MiddleResult = fut.apply().asInstanceOf[MiddleResult] // 這裡要做一個轉換
//將結果存入容器中
container += perResult
}
//分析ListBuffer,將所有檔案中單詞的總次數再次聚合起來,並顯示結果
//container.foreach(println)
// 1 + 2 + 3
//foldLeft(0) :0 和的初始值
//(x:Int,y:(String,Int))=>x+y._2) : x ~>每次累加的和; y: ListBuffer中的每一個元組
//println(container.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)((x:Int,y:(String,Int))=>x+y._2)))
//println(container.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)))
container.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList.sortBy(_._2).reverse.foreach(x=>println(x._1+"\t"+x._2))
}
}
//設計一個樣例類MiddleResult,用來儲存每個檔案中單詞出現的總次數
case class MiddleResult(result: Map[String, Int]) // 這裡定義的時候經常會出錯
//設計一個樣例類SubmitTask, 用作主執行緒向子執行緒傳送的訊息
case class SubmitTask(file: String) // 這個可以不設
/**
* 設計要給執行緒類Task,用於計算指定檔案中單詞出現的總次數,計算完畢後,向主執行緒反饋結果
*/
class Task extends Actor {
override def act(): Unit = {
loop {
react {
case SubmitTask(file) => {
val lst: List[String] = Source.fromFile(file).getLines.toList
val nowResult: Map[String, Int] = lst.flatMap(_.split(" "))
.filter(_.trim != "").map((_, 1)).groupBy(_._1).map(t => (t._1, t._2.size))
sender ! MiddleResult(nowResult) // 傳送的返回值要用樣例類包起來
}
}
}
}
}
- 精簡版
class MyActor extends Actor {
override def act(): Unit = {
loop {
react{
case file:String => {
val map:Map[String, Int] = Source.fromFile(file).getLines().flatMap(_.split(" ")).filter(_.trim != "").toList.map((_,1)).groupBy(_._1).map((t => (t._1,t._2.size)))//.toList.sortBy(_._2).reverse
sender ! Tuple(map)
}
case _ => println("msg 錯誤")
}
}
}
}
case class Tuple (map:Map[String,Int])
object Word_count {
def main(args: Array[String]): Unit = {
var list = ListBuffer[Tuple]()
val files = List("E:\\1.txt","E:\\2.txt","E:\\3.txt")
for(file <- files ) {
val task = new MyActor
task.start()
val msg = task !! file
val list2 = msg.apply().asInstanceOf[Tuple] // 想要解決這個型別轉換錯誤的問題還需要將上面傳過來的值給封裝一下,封裝成樣例類
list.append(list2)
}
// list.flatMap(_.map).groupBy(_._1).mapValues(_.foldLeft(0)( (x:Int,y:(String,Int)) => x + y._2)).toList.sortBy(_._2).reverse.foreach(println)
list.flatMap(_.map).groupBy(_._1).mapValues(_.foldLeft(0) (_ + _._2) ).toList.sortBy(_._2).reverse.foreach(println)
}
}
四、Scala 程式設計實戰: Akka 構建簡易的 spark 框架
- Akka 定義:
基於Actor模型,提供了一個用於構建可擴充套件的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程式的平臺。(最底層是nety)
- akka 涉及的一些核心API
- Configuration
- Config
- ActorSystem
- Props
- Actor
- Scheduler
- Actor從父類繼承過來的一個屬性context
- ActorRef
- ActorSelection -> 在Worker程序中獲得對應的Master程序的例項
自定義spark框架
- Master
- 樣例類:
//Worker程序註冊資訊樣例類
- case class RegistWorker(id:String,host:String,memory:Long,coreNum:Int)
//Worker程序已註冊的資訊樣例類
- case class RegistedWorker(id: String)
//超時時間的樣例類 資訊
- case object TimeOutMsg
//容器,儲存所有Worker程序的資訊
- case class WorkerInfo(id:String,host:String,memory:Long,coreNum:Int){
var lastSendHeartBeatTime:Long = 0L // 封裝最後一次心跳包的時間 }
- class Master extends Actor {
- mutable.Map
- mutable.Set
- override def preStart():Unit { // 執行緒初始化處理
- import context.dispatcher // 啟動定時器,用於自檢
//dispatcher翻譯:排程程式 scheduler翻譯:排程器
- context.system.scheduler.schedule( // 下面是引數
- FiniteDuration(0,TimeUnit.SECONDS), // 第一次啟動定時器的時候,延長多少時間啟動
- FiniteDuration(3,TimeUnit.SECONDS), // 以後定時器每隔多長時間啟動一次
- self, // 是當前Actor 例項的代理物件 --- 類似明星的經濟人
- TimeOutMsg // 樣例類,要傳送的訊息
)
}
- override def receive():Unit { // 執行緒體的具體邏輯,會被後臺的迴圈體呼叫
- case RegistWorker //判斷容器中是否存在Worker,不存在就新增到容器中
- case HeartBeatMsg //來自Worker心跳包資訊,從容器中取出,給Worker的最後傳送心跳包時間屬性賦值
- case TimeOutMsg//Maste給自己傳送的,檢測目前能正常工作的Worker程序的訊息
- 這裡面會 計算 當前時間與最後一次傳送心跳包的時間差, 若 > 3,則Worker宕機,從容器中移除
}
}
object Master {
def main(args: Array[String]): Unit = {
- 準備ActorSystem的例項,負責建立和監督執行緒
- host
- port
- configStr
- config
- actorSystem
- 建立Actor執行緒的例項
actorySytem.actorOf(Props[Master],"Master")
- 列印顯示程序已啟動
}
}
- Worker
- case class HeartBeatMsg(id:String) //心跳包樣例類
class Worker extends Actor{
- Master 的屬性
- 執行緒的ID
- 執行緒的例項初始化處理
override def preStart():Unit ={
- 獲得Master的程序例項
- worker 程序向Master 傳送註冊資訊
}
- 執行緒例項初始化完畢之後,執行下述的receive方法,也是會被後臺的迴圈體呼叫
override def receive: Receive = {
- case RegistedWorker(id) =>{..} // 註冊過得程序樣例類,(啟動定時器)
- case HearBeattMsg(id) =>{..}// 心跳包樣例類,裡面要向Master傳送心跳包
}
}
object Woker {
def main(args: Array[String]): Unit = {
- 準備ActorSystem的例項,負責建立和監督執行緒
- host
- port
- configStr
- config
- actorSystem
- 建立Actor執行緒的例項
actorySytem.actorOf(Props[Worker],"Worker")
- 列印顯示程序已啟動
}
}
- 原始碼如下:
- Master
package cn.qphone.scala.Akka
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
/**
* description :Master 程序,負責計算資源排程的程序
*
* @author 王友俊
*/
// 定義樣例類
// Worker 程序註冊資訊樣例類
case class RegistWorker(id:String,host:String,memory:Long,coreNum:Int)
// Worker 程序 已經 註冊的資訊樣例類
case class RegistedWorker(id: String)
// 超時時間的樣例類資訊
case object TimeOutMsg
// 準備兩個容器,儲存所有的Worker程序的資訊
//WorkerInfo
case class WorkerInfo(id:String,host:String,memory:Long,coreNum:Int){
// 封裝了最後一次傳送心跳包的 時間(Worker to Master)
var lastSendHeartBeatTime:Long = 0L
}
class Master extends Actor{
// Map
val container:mutable.Map[String,WorkerInfo] = mutable.HashMap[String,WorkerInfo]()
//Set
val containerSet:mutable.Set[WorkerInfo] = mutable.HashSet[WorkerInfo]()
// 執行緒初始化處理
override def preStart(): Unit = {
// 啟動定時器,用於自檢(檢查有沒有宕機的Worker 程序資訊)
import context.dispatcher
context.system.scheduler.schedule( // 這裡面填的引數可以直接點進去schedule() 原始碼檢視
FiniteDuration(0,TimeUnit.SECONDS), // 第一次啟動定時器的時候,延長多少時間啟動
FiniteDuration(3,TimeUnit.SECONDS), // 以後定時器每隔多長時間啟動一次
self, // 是當前Actor 例項的代理物件 --- 類似明星的經濟人
TimeOutMsg // 樣例類,要傳送的訊息
)
}
// 執行緒體具體的邏輯,下面寫的邏輯會被後臺的迴圈體來呼叫
override def receive: Receive = {
// 來自Worker程序註冊的訊息
case RegistWorker(id,host,memory,coreNum) => {
// 判斷容器中是否不存在該Worker,不存在就新增到容器中
if(!container.contains(id)) {
val workerInfo:WorkerInfo = WorkerInfo(id,host,memory,coreNum)
container(id) = workerInfo // 向 Map 中新增
containerSet.add(workerInfo) // 向 Set 中新增
sender ! RegistedWorker(id) // master 向 worker 程序反饋已註冊的程序的資訊
}
}
// 來自Worker程序傳送過來的心跳包的資訊
case HeartBeatMsg(id) => {
try{
val info:WorkerInfo = container.getOrElse(id,null) // 從容器中取出心跳包中Worker程序的訊息
info.lastSendHeartBeatTime = System.currentTimeMillis() // 給Worker程序的屬性最後一次傳送心跳包的時間賦值
} catch {
case e : NullPointerException =>println("容器中已經沒有正在執行的程序了")
case _ : Exception => println("未知錯誤,請檢查你的機器")
}
}
//Master程序給自己傳送的用於檢測目前能正常運作的的Worker程序的訊息
case TimeOutMsg => {
println("TimeOurMsg 開始執行,當前容器中程序數為:" + container.size)
val nowTime = System.currentTimeMillis()
// 從容器中獲取所有超時的Worker程序的資訊取出來
//若當前時間與Worker程序最後一次向主執行緒Master傳送心跳包的時間差 > 3 ,此時,證明該Worker程序宕機。從容器中移除
val willDelWorkers:Array[WorkerInfo] = containerSet.filter(nowTime - _.lastSendHeartBeatTime > 3*1000).toArray
for (work <- willDelWorkers) {
// 從 Map 中刪除
container.remove(work.id)
// 從 Set 中刪除
containerSet.remove(work)
println("將宕機的程序" + work.id + "刪除...")
}
}
}
}
object Master{
def main(args: Array[String]): Unit = {
// 步驟
// 1. 準備ActorSystem的例項,這玩意是老大,負責建立 和 監督 執行緒
val host = "127.0.0.1"
val port = "9000"
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config:Config = ConfigFactory.parseString(configStr)
val actorySytem:ActorSystem = ActorSystem.create("MasterActorSystem",config)
// 2. 構建Actor執行緒的例項---
actorySytem.actorOf(Props[Master],"Master")
// 3. 顯示資訊,表示Master程序已經啟動
println("Master 程序已啟動 ...")
}
}
- Worker
package cn.qphone.scala.Akka
import java.util.UUID
import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration.FiniteDuration
/**
* description : worker 程序,是負責計算的程序
*
* @author 王友俊
*/
// 定義樣例類
// Worker程序想Master程序傳送的心跳包訊息樣例類
case class HeartBeatMsg(id:String)
class Worker extends Actor{
// Maste 型別的屬性
var master:ActorSelection = null
// 執行緒的ID,( UUID 可保證不重複)
val id = UUID.randomUUID().toString
// 執行緒例項初始化處理
override def preStart(): Unit = {
// 用來獲得Master的程序例項 // 方法需要一個path路徑:1、通訊協議、2、master的IP地址、3、master的埠 4、建立master actor老大 5、actor層級
master = context.system.actorSelection("akka.tcp://[email protected]:9000/user/Master")
// Worker程序想Master傳送註冊資訊 //RegistWorker(id: String, host: String, memory: Long, coreNum: Int)
master ! RegistWorker(id,"127.0.0.1",4,8)
}
// 執行緒例項初始化完畢之後,執行下述的receive方法,該方法會被後臺得迴圈體呼叫
override def receive: Receive = {
// 註冊過得Worker程序樣例類(啟動定時器)
case RegistedWorker(id) => {
import context.dispatcher
context.system.scheduler.schedule(FiniteDuration(0,TimeUnit.SECONDS),FiniteDuration(4,TimeUnit.SECONDS),self,HeartBeatMsg(id))
}
// 心跳包樣例類
case HeartBeatMsg(id) => {
// 向Master 傳送 心跳包
master ! HeartBeatMsg(id)
println("worker 已傳送心跳包...")
}
}
}
object Worker{
def main(args: Array[String]): Unit = {
// 步驟
// 1. 準備ActorSystem的例項,這玩意是老大,負責建立 和 監督 執行緒
val host = "127.0.0.1"
val port = "4402"
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config:Config = ConfigFactory.parseString(configStr)
val actorySytem:ActorSystem = ActorSystem.create("WorkerActorSystem",config)
// 2. 構建Actor執行緒的例項---
actorySytem.actorOf(Props[Worker],"Worker")
// 3. 顯示資訊,表示Master程序已經啟動
println("Worker 程序已啟動 ...")
}
}