Scala併發程式設計模型AKKA
阿新 • • 發佈:2019-01-06
一、併發程式設計模型AKKA
Spark使用底層通訊框架AKKA 分散式 master worker hadoop使用的是rpc 1)akka簡介 寫併發程式很難,AKKA解決spark這個問題。 akka構建在JVM平臺上,是一種高併發、分散式、並且容錯的應用工具包 akka用scala語言編寫同時提供了scala和java的開發介面 akka可以開發一些高併發程式。 2)Akka的Actor模型 akka處理併發的方法基於actor模型 在基於actor的系統中,所有事物都是actor。 actor作為一個併發模型設計和架構的,面向物件不是。 actor與actor之間只能通過訊息通訊。 Akka特點: (1)對併發模型進行了更高的抽象 (2)非同步、非阻塞、高效能的事件驅動程式設計模型 (3)輕量級事件處理(1G記憶體可以容納百萬級別的Actor) 同步:阻塞(發訊息 一直等待訊息) 非同步:不阻塞(發訊息 不等待 該幹嘛幹嘛) actor簡化了併發程式設計,提高了程式效能。
1、Actor模型
2、Actor工作機制
二、AKKA程式設計
1、需求 我發訊息,自己收
object CallMe { //1.建立ActorSystem 用ActorSystem建立Actor private val acFactory = ActorSystem("AcFactory")//2.Actor傳送訊息通過ActorRef private val callRef = acFactory.actorOf(Props[CallMe],"CallMe") def main(args: Array[String]): Unit = { //3.傳送訊息 callRef ! "你吃飯了嗎" callRef ! "很高興見到你" callRef ! "stop" } } class CallMe extends Actor{ //Receive使用者接收訊息並且處理訊息 override def receive: Receive = {case "你吃飯了嗎" => println("吃的雞腿") case "很高興見到你" => println("我也是") case "stop" => { //關閉代理ActorRef context.stop(self) //關閉ActorSystem context.system.terminate() } } }
結果:
2.需求 一個Actor傳送訊息,另外一個Actor接收訊息
(1)TomActor
import akka.actor.Actor class TomActor extends Actor{ override def receive: Receive = { case "你好,我是John" => { println("你好,我是Tom") } case "我愛Tom" => { println("Tom也愛John") } } }
(2)JohnActor
import akka.actor.{Actor, ActorRef} class JohnActor(val h:ActorRef) extends Actor{ override def receive: Receive = { case "你好,我是John" => { //John傳送訊息給TomActor h ! "我愛Tom" } } }
(3)QqDriver
import akka.actor.{ActorSystem, Props} object QqDriver { //1.建立ActorSystem 用ActorSystem建立Actor private val qqFactory = ActorSystem("QqFactory") //2.Actor傳送訊息通過ActorRef private val hRef = qqFactory.actorOf(Props[TomActor],"Tom") //John需要接受Tom傳送的訊息 private val dRef = qqFactory.actorOf(Props(new JohnActor(hRef)),"John") def main(args: Array[String]): Unit = { //1.Tom自己給自己傳送訊息 //hRef ! "我愛Tom" //2John給Tom傳送訊息 dRef ! "你好,我是John" } }
(4)結果
3、maven依賴pom檔案
<!-- 定義版本常量 --> <properties> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <scala.compat.version>2.11</scala.compat.version> <akka.version>2.4.17</akka.version> </properties> <dependencies> <!-- 新增scala包的依賴 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 新增akka包的actor依賴 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_${scala.compat.version}</artifactId> <version>${akka.version}</version> </dependency> <!-- 多程序之間的Actor通訊設定 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_${scala.compat.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies> <!-- 指定使用外掛--> <build> <!-- 指定原始碼包和測試包的位置資訊 --> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <!-- 指定編譯scala的外掛 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <!-- maven打包使用的外掛 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <!-- 指定main方法 --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.itstaredu.spark.SparkWorker</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>