1. 程式人生 > >使用scala的Actor模擬計算多檔案WordCount

使用scala的Actor模擬計算多檔案WordCount

scala的Actor是基於事件模型的,具體的模型可以自己查詢資料,這裡根據別人的demo程式碼自己也寫了一個基於Actor的事件模型的多檔案計算WordCount,程式碼中我寫了詳細的註釋,僅供參考

首先在D盤下面建立三個檔案,裡面寫一些單詞用空格分開:

這裡寫圖片描述

具體程式碼如下:

package com.lijie.actor

import scala.actors._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source

/**
  * Created by Lijie on 2017/3/22.
  */
class MyActor extends Actor { //重寫Actor中的act()方法,和java中的run()一樣 override def act(): Unit = { //相當於while(true) 這裡用react可以複用執行緒池 loop { //這是個偏函式,定義兩個模式匹配 react { case MapCalculation(fileName) => { //計算檔案中的word的數量,並用sender把訊息傳送回去 sender ! MapResult(Source.fromFile(fileName).getLines().flatMap(_.split(" "
)).map((_, 1)).toList.groupBy(_._1).mapValues(_.size)) } } } } } //定義一個case class 用來模擬map的word統計 case class MapCalculation(fileName: String) //定義一個case class 用來模擬map的word統計的返回結果封裝 case class MapResult(mapResult: Map[String, Int]) object MyActor { def main(args: Array[String]): Unit = { //用來儲存非同步返回的future
val futures = new mutable.HashSet[Future[Any]]() //用來儲存map返回的值 val listRes = new ListBuffer[Map[String, Int]]() //定義檔案的位置,類似於mapreduce的FileInputFormat.addInputPath val files = Array("D:\\lijietest01.txt", "D:\\lijietest02.log", "D:\\lijietest03.txt") //迴圈向MyActor傳送訊息 for (fileName <- files) { //建立訊息物件 並啟動 且傳送非同步訊息 //每啟動一個傳送一個非同步訊息並返回一個Future,這裡同java中的Callable val actor = new MyActor //傳送非同步訊息並返回Future(! !? !!) val future = actor.start() !! MapCalculation(fileName) //扔到hashSet中 futures += future } //模擬reduce while (futures.size > 0) { //這裡取出已經計算完成的非同步返回 isSet類似於Callable的isDone返回boolean,將計算完的結果放到res val res = futures.filter(_.isSet) for (r <- res) { //這裡的apply()相當於Callable的get(),強轉成MapResult(上面那個case class),取出裡面的map並且放入到一個list中 listRes += r.apply().asInstanceOf[MapResult].mapResult //移除futures已經處理的物件,直到移除完 這個while迴圈就終止 futures -= r //如果檔案很大,最後設定睡眠時間,不然cpu會空跑 Thread.sleep(100) } } //對上面處理完成的map結果進行reduce //其中這個listRes打印出來如下:(因為我這裡三個檔案內容一樣,所以每個map裡面的資料一樣) // ListBuffer( // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1), // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1), // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1) // ) val wordCount = listRes.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) //測試列印結果 //Map(world -> 3, soga -> 3, scala -> 6, lijie -> 12, abcd -> 3, hello -> 9, nihao -> 3) println(wordCount) } }

執行結果如下:

這裡寫圖片描述