1. 程式人生 > >Akka實現WordCount(Scala)

Akka實現WordCount(Scala)

post aggregate compiler lan www word you eof current

Akka實現WordCount(Scala):

架構圖:

技術分享圖片

項目結構:

技術分享圖片

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.citi.sky</groupId>
	<artifactId>AkkaPJ</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>AkkaPJ</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
	
	
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.12</version>
			<scope>test</scope>
		</dependency>
	
	  <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.6</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.6</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.6</version>
        </dependency>
	
	
		<dependency>
			<groupId>com.typesafe.akka</groupId>
			<artifactId>akka-actor_2.11</artifactId>
			<version>2.3.3</version>

		</dependency>

		<dependency>
			<groupId>com.typesafe.akka</groupId>
			<artifactId>akka-testkit_2.11</artifactId>
			<version>2.3.6</version>
			<scope>test</scope>
		</dependency>
		
		<dependency>
			<groupId>org.scalatest</groupId>
			<artifactId>scalatest_2.11</artifactId>
			<version>3.0.4</version>
			<scope>test</scope>
		</dependency>



	</dependencies>
	
	  <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
	
</project>

消息:

case class MapData (dataList: List[WordCount])

case class ReduceData (reduceDataList: Map[String, Int])

case class Result()

case class WordCount (key: String, count: Int)

Actors:

MasterActor

import akka.actor.Actor
import akka.actor.Props
import com.citi.dw.messages.Result

class MasterActor extends Actor {

  private val aggregateActor = context.actorOf(Props(classOf[AggregateActor]), "aggregateActor")
  private val reduceActor = context.actorOf(Props(classOf[ReduceActor], aggregateActor), "reduceActor")
  private val mapActor = context.actorOf(Props(classOf[MapActor], reduceActor), "mapActor")
  
  

  def receive: Actor.Receive = {
    case msg: String => {
      mapActor ! msg
    }
    case msg: Result => {
      aggregateActor.forward(msg)
    }
//    case msg: Map[String, Int] => 
    case _ => println("MasterActor receive wrong message.")
  }
}

MapActor:

import akka.actor.Actor
import com.citi.dw.messages.MapData
import com.citi.dw.messages.WordCount
import scala.collection.mutable.ListBuffer
import akka.actor.ActorRef

class MapActor(val reduceActor: ActorRef) extends Actor {
  def receive: Actor.Receive = {
    case msg: String => {
      val mapData = evaluateExpression(msg)
      reduceActor ! mapData
    }
    case _ => println("MapActor receive wrong message.")
  }
  
  private[this] def evaluateExpression(line: String): MapData = {
    val dataList = ListBuffer[WordCount]()
    line.split(" ").map(word => dataList += WordCount(word, 1))
    
//    val wordArr = line.split(" ")
//    for(word <- wordArr) {
//       dataList += WordCount(word, 1)
//    }
//    println(dataList)
    MapData(dataList.toList)
  }
  
  
}

ReduceActor:

import akka.actor.Actor
import com.citi.dw.messages.MapData
import com.citi.dw.messages.ReduceData
import com.citi.dw.messages.WordCount
import scala.collection.mutable.HashMap
import akka.actor.ActorRef

class ReduceActor(val aggregateActor: ActorRef) extends Actor {

  def receive: Actor.Receive = {
    case msg: MapData => {
      val reduceData = reduce(msg.dataList)
      aggregateActor ! reduceData
    }
    case _ => println("ReduceActor receive wrong message.")
  }

  private[this] def reduce(dataList: List[WordCount]): ReduceData = {
    val reduceMap = HashMap[String, Int]()

    for (wc <- dataList) {
      wc match {
        case WordCount(key, count) if reduceMap.contains(key) => {
          val localSumCount = reduceMap.get(key).get + count
          reduceMap += ((key, localSumCount))
          //          println(reduceMap)
        }
        case WordCount(key, count) => {
          reduceMap += ((key, 1))
          //          println(reduceMap)
        }
      }

    }

    ReduceData(reduceMap.toMap)
  }

}

AggregateActor:

import akka.actor.Actor
import com.citi.dw.messages.ReduceData
import scala.collection.mutable.HashMap
import com.citi.dw.messages.Result
import akka.actor.ActorRef

class AggregateActor extends Actor {

  private[this] var finalReduceMap = HashMap[String, Int]()

  def receive: Actor.Receive = {
    case msg: ReduceData => {
      aggregateAndReduce(msg.reduceDataList)
    }
    case msg: Result => {
//      println(f"Result: ${finalReduceMap}")
//      sender().tell(finalReduceMap.toMap, ActorRef.noSender)
      sender ! finalReduceMap.toMap
    }
    case _ => println("AggregateActor receive wrong message.")
  }

  private[this] def aggregateAndReduce(reduceList: Map[String, Int]) = {
//   println(s"final: ${finalReduceMap}")
    for (key <- reduceList.keys) {
      if (finalReduceMap.contains(key)) {
     
        val count = finalReduceMap.get(key).get + reduceList.get(key).get
        finalReduceMap += ((key, count))
      } else {
        finalReduceMap += ((key, reduceList.get(key).get))
      }
    }

  }

}

主程序:

import akka.actor.ActorSystem
import akka.actor.Props
import com.citi.dw.actors.MasterActor
import com.citi.dw.messages.Result
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await

object AkkaWordCount extends App {

  implicit val timeout = Timeout(5 seconds)
  val system = ActorSystem("WordCountAkka")
  val master = system.actorOf(Props(classOf[MasterActor]), "master")

  master ! "Hi! Hi!"
  master ! ("My name is Sky. I am so so so happy to be here ")
  master ! ("Today, I am going to introduce word count for Akka ")
  master ! ("I hope hope It is helpful to you ")
  master ! ("Thank you ")

  Thread.sleep(1000)

  val future = master ? Result()
//  future.onComplete({
//    case Success(x: String) => println(x)
//    case Failure(t)         => println(t)
//    case msg                => println("unknown message! " + msg)
//  })

  val result = Await.result(future, timeout.duration).asInstanceOf[Map[String, Int]]
  result.map(m => println(m._1, m._2))


  system.shutdown()

}

運行結果:

(for,1)
(name,1)
(count,1)
(is,2)
(am,2)
(My,1)
(going,1)
(so,3)
(introduce,1)
(Sky.,1)
(I,3)
(to,3)
(Hi!,2)
(you,2)
(here,1)
(happy,1)
(Thank,1)
(hope,2)
(Today,,1)
(helpful,1)
(Akka,1)
(It,1)
(be,1)
(word,1)

 

Akka實現WordCount(Scala)