使用scala的Actor模擬計算多檔案WordCount
阿新 • • 發佈:2018-12-25
scala的Actor是基於事件模型的,具體的模型可以自己查詢資料,這裡根據別人的demo程式碼自己也寫了一個基於Actor的事件模型的多檔案計算WordCount,程式碼中我寫了詳細的註釋,僅供參考
首先在D盤下面建立三個檔案,裡面寫一些單詞用空格分開:
具體程式碼如下:
package com.lijie.actor
import scala.actors._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source
/**
* Created by Lijie on 2017/3/22.
*/
class MyActor extends Actor {
//重寫Actor中的act()方法,和java中的run()一樣
override def act(): Unit = {
//相當於while(true) 這裡用react可以複用執行緒池
loop {
//這是個偏函式,定義兩個模式匹配
react {
case MapCalculation(fileName) => {
//計算檔案中的word的數量,並用sender把訊息傳送回去
sender ! MapResult(Source.fromFile(fileName).getLines().flatMap(_.split(" " )).map((_, 1)).toList.groupBy(_._1).mapValues(_.size))
}
}
}
}
}
//定義一個case class 用來模擬map的word統計
case class MapCalculation(fileName: String)
//定義一個case class 用來模擬map的word統計的返回結果封裝
case class MapResult(mapResult: Map[String, Int])
object MyActor {
def main(args: Array[String]): Unit = {
//用來儲存非同步返回的future
val futures = new mutable.HashSet[Future[Any]]()
//用來儲存map返回的值
val listRes = new ListBuffer[Map[String, Int]]()
//定義檔案的位置,類似於mapreduce的FileInputFormat.addInputPath
val files = Array("D:\\lijietest01.txt", "D:\\lijietest02.log", "D:\\lijietest03.txt")
//迴圈向MyActor傳送訊息
for (fileName <- files) {
//建立訊息物件 並啟動 且傳送非同步訊息
//每啟動一個傳送一個非同步訊息並返回一個Future,這裡同java中的Callable
val actor = new MyActor
//傳送非同步訊息並返回Future(! !? !!)
val future = actor.start() !! MapCalculation(fileName)
//扔到hashSet中
futures += future
}
//模擬reduce
while (futures.size > 0) {
//這裡取出已經計算完成的非同步返回 isSet類似於Callable的isDone返回boolean,將計算完的結果放到res
val res = futures.filter(_.isSet)
for (r <- res) {
//這裡的apply()相當於Callable的get(),強轉成MapResult(上面那個case class),取出裡面的map並且放入到一個list中
listRes += r.apply().asInstanceOf[MapResult].mapResult
//移除futures已經處理的物件,直到移除完 這個while迴圈就終止
futures -= r
//如果檔案很大,最後設定睡眠時間,不然cpu會空跑
Thread.sleep(100)
}
}
//對上面處理完成的map結果進行reduce
//其中這個listRes打印出來如下:(因為我這裡三個檔案內容一樣,所以每個map裡面的資料一樣)
// ListBuffer(
// Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1),
// Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1),
// Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1)
// )
val wordCount = listRes.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
//測試列印結果
//Map(world -> 3, soga -> 3, scala -> 6, lijie -> 12, abcd -> 3, hello -> 9, nihao -> 3)
println(wordCount)
}
}
執行結果如下: