1. 程式人生 > >Scala使用Actor進行併發程式設計

Scala使用Actor進行併發程式設計

Akka 是一個用 Scala 編寫的庫,用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。 
Actor模型並非什麼新鮮事物,它由Carl Hewitt於上世紀70年代早期提出,目的是為了解決分散式程式設計中一系列的程式設計問題。其特點如下:

  • 系統中的所有事物都可以扮演一個Actor
  • Actor之間完全獨立
  • 在收到訊息時Actor所採取的所有動作都是並行的,在一個方法中的動作沒有明確的順序
  • Actor由標識和當前行為描述
  • Actor可能被分成原始(primitive)和非原始(non primitive)類別
  • 非原始Actor有
    • 由一個郵件地址表示的標識
    • 當前行為由一組知識(acquaintances)(例項變數或本地狀態)和定義Actor在收到訊息時將採取的動作組成
  • 訊息傳遞是非阻塞和非同步的,其機制是郵件佇列(mail-queue)
  • 所有訊息傳送都是並行的

使用Actor

Actor是Scala的併發模型。在2.10之後的版本中,使用http://akka.io/作為其推薦Actor實現。 
Actor是類似執行緒的實體,有一個郵箱。 
可以通過system.actorOf來建立,receive獲取郵箱訊息,!向郵箱傳送訊息。 這個例子是一個EchoServer,接受資訊並列印。

  1. import akka.actor.{Actor,ActorSystem,Props}
  2. val system =ActorSystem()
  3. classEchoServerextendsActor
    {
  4. def receive ={
  5. case msg:String=> println("echo "+ msg)
  6. }
  7. }
  8. val echoServer = system.actorOf(Props[EchoServer])
  9. echoServer !"hi"
  10. system.shutdown

Actor更簡化的用法

可以通過更簡化的辦法宣告Actor。 匯入akka.actor.ActorDSL中的actor函式。 這個函式可以接受一個Actor的構造器Act,啟動並返回Actor。

  1. import akka.actor.ActorDSL._
  2. import akka.actor.ActorSystem
  3. implicit val system =ActorSystem()
  4. val echoServer = actor(newAct{
  5. become {
  6. case msg => println("echo "+ msg)
  7. }
  8. })
  9. echoServer !"hi"
  10. system.shutdown

Actor原理

Actor比執行緒輕量。在Scala中可以建立數以百萬級的Actor。奧祕在於Actor可以複用執行緒。 
Actor和執行緒是不同的抽象,他們的對應關係是由Dispatcher決定的。 
這個例子建立4個Actor,每次呼叫的時候列印自身執行緒。 可以發現Actor和執行緒之間沒有一對一的對應關係。 一個Actor可以使用多個執行緒,一個執行緒也會被多個Actor複用。

  1. import akka.actor.{Actor,Props,ActorSystem}
  2. import akka.testkit.CallingThreadDispatcher
  3. implicit val system =ActorSystem()
  4. classEchoServer(name:String)extendsActor{
  5. def receive ={
  6. case msg => println("server"+ name +" echo "+ msg +
  7. " by "+Thread.currentThread().getName())
  8. }
  9. }
  10. val echoServers =(1 to 10).map(x =>
  11. system.actorOf(Props(newEchoServer(x.toString))
  12. .withDispatcher(CallingThreadDispatcher.Id)))
  13. (1 to 10).foreach(msg =>
  14. echoServers(scala.util.Random.nextInt(10))! msg.toString)
  15. system.shutdown

同步返回

Actor非常適合於較耗時的操作。比如獲取網路資源。 
這個例子通過呼叫ask函式來獲取一個Future。 在Actor內部通過 sender ! 傳遞結果。 Future像Option一樣有很多高階方法,可以使用foreach檢視結果。

  1. import akka.actor.ActorDSL._
  2. import akka.pattern.ask
  3. implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
  4. implicit val system = akka.actor.ActorSystem()
  5. val versionUrl ="https://raw.github.com/scala/scala/master/starr.number"
  6. val fromURL = actor(newAct{
  7. become {
  8. case url:String=> sender ! scala.io.Source.fromURL(url)
  9. .getLines().mkString("\n")
  10. }
  11. })
  12. val version = fromURL.ask(versionUrl)(akka.util.Timeout(5*1000))
  13. version.foreach(println _)
  14. system.shutdown

非同步返回

非同步操作可以最大發揮效能。Scala的Futrue很強大,可以非同步返回。 可以實現Futrue的onComplete方法。當Futrue結束的時候就會回撥。 在呼叫ask的時候,可以設定超時。

  1. import akka.actor.ActorDSL._
  2. import akka.pattern.ask
  3. implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
  4. implicit val system = akka.actor.ActorSystem()
  5. val versionUrl ="https://raw.github.com/scala/scala/master/starr.number"
  6. val fromURL = actor(newAct{
  7. become {
  8. case url:String=> sender ! scala.io.Source.fromURL(url)
  9. .getLines().mkString("\n")
  10. }
  11. })
  12. val version = fromURL.ask(versionUrl)(akka.util.Timeout(5*1000))
  13. version onComplete {
  14. case msg => println(msg); system.shutdown
  15. }

併發集合

這個例子是訪問若干URL,並記錄時間。 如果能併發訪問,就可以大幅提高效能。 嘗試將urls.map修改為urls.par.map。這樣每個map中的函式都可以併發執行。 當函式式和併發結合,就會這樣讓人興奮。

  1. import scala.io.Codec
  2. import java.nio.charset.CodingErrorAction
  3. implicit val codec =Codec("UTF-8")
  4. codec.onMalformedInput(CodingErrorAction.REPLACE)
  5. codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
  6. val urls ="http://scala-lang.org"::"https://github.com/yankay/scala-tour"::Nil
  7. def fromURL(url:String)= scala.io.Source.fromURL(url).getLines().mkString("\n")
  8. val s =System.currentTimeMillis()
  9. time(urls.map(fromURL(_)))
  10. println("time: "+(System.currentTimeMillis - s)+"ms")

相關推薦

concurrent.futures進行併發程式設計

Python中進行併發程式設計一般使用threading和multiprocessing模組,不過大部分的併發程式設計任務都是派生一系列執行緒,從佇列中收集資源,然後用佇列收集結果。在這些任務中,往往需要生成執行緒池,concurrent.futures模組對threading和multiprocessing

Scala使用Actor進行併發程式設計

Akka 是一個用 Scala 編寫的庫,用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。  Actor模型並非什麼新鮮事物,它由Carl Hewitt於上世紀70年代早期提出,目的是為了解決分散式程式設計中一系列的程式設計問題。其

python教程:使用 async 和 await 協程進行併發程式設計

python 一直在進行併發程式設計的優化, 比較熟知的是使用 thread 模組多執行緒和 multiprocessing 多程序,後來慢慢引入基於 yield 關鍵字的協程。 而近幾個版本,python 對於協程的寫法進行了大幅的優化,很多之前的協程寫法不被官方推薦了。如果你之前瞭解過 python 協程

python 使用多程序實現併發程式設計/使用queue進行程序間資料交換

import time import os import multiprocessing from multiprocessing import Queue, pool """ 一.Python 使用多程序實現併發程式設計: 因為cpython直譯器中有GIL存在的原因(每個程序都會維護一

漫談併發程式設計:用MPI進行分散式記憶體程式設計(入門篇)

0x00 前言 本篇是MPI的入門教程,主要是為了簡單地瞭解MPI的設計和基本用法,方便和現在的Hadoop、Spark做對比,並嘗試理解它們之間在設計上有什麼區別。 身處Hadoop、Spark這些優秀的分散式開發框架蓬勃發展的今天,老的分散式程式

Linux下網路程式設計之自定義協議進行併發多客戶端與伺服器的通訊(多程序處理併發)不足佔用資源太多

自定義協議訊息體*********msg.h*************#ifndef _MSG_H_#define _MSG_H_struct msg{ char head[10]; //頭部 char msg_chck; //效驗碼 char buff[512];/

併發程式設計面試必備:JUC 中的 Atomic 原子類總結

個人覺得這一節掌握基本的使用即可! 本節思維導圖: 1 Atomic 原子類介紹 Atomic 翻譯成中文是原子的意思。在化學上,我們知道原子是構成一般物質的最小單位,在化學反應中是不可分割的。在我們這裡 Atomic 是指一個操作是不可中斷的。即使是在多個執行緒一起執行的時

【轉】Java併發程式設計:同步容器

  為了方便編寫出執行緒安全的程式,Java裡面提供了一些執行緒安全類和併發工具,比如:同步容器、併發容器、阻塞佇列、Synchronizer(比如CountDownLatch)。今天我們就來討論下同步容器。   一、為什麼會出現同步容器?   在Java的集合容器框架中,主要有四大類別:Li

《JAVA併發程式設計實戰》避免活躍性危險

文章目錄 死鎖 鎖順序死鎖 動態的鎖順序死鎖 在協作物件之間發生的死鎖 開放呼叫 資源死鎖 死鎖的避免和診斷 支援定時的鎖 使用執行緒轉儲資訊來分析死鎖 其他活躍性危

《JAVA併發程式設計實戰》取消和關閉

文章目錄 引言 任務取消 中斷 中斷策略 響應中斷 示例:計時執行 通過Future來實現取消 處理不可中斷的阻塞 採用newTaskFor封裝非標準的取消 停止基於執行緒的服務

《JAVA併發程式設計實戰》任務執行

文章目錄 線上程中執行任務 序列執行任務 顯式的為任務建立執行緒 無限制建立執行緒的不足 Executor框架 示例:基於Executor的Web伺服器 執行策略 執行緒池 Exe

《JAVA併發程式設計實戰》基礎構建模組

文章目錄 同步容器類 同步容器類的問題 迭代器和ConcurrentModificationException 隱藏迭代器 併發容器 ConcurrentHashMap 額外的原子Map操作

《JAVA併發程式設計實戰》物件的組合

文章目錄 設計執行緒安全的類 找出構成物件狀態的所有變數 示例 找出約束狀態變數的不變性條件 例項封閉 java監視器模式 示例:車輛追蹤 執行緒安全性的委託

java併發程式設計之利用CAS保證操作的原子性

import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger at

Java併發程式設計之CyclicBarrier

CyclicBarrier可以控制這樣的場景: 對多個執行緒,他們執行自己程式碼(執行run方法)的時間不一樣; 比如有3個執行緒,其run方法執行時間分別為1s, 2s, 3s。如果我們想在三個執行緒都完成自己的任務時執行相應的操作,CyclicBarrier就派上用場了。 寫了一

Java併發程式設計高階技術-高效能併發框架原始碼解析與實戰(資源同步)

第1章 課程介紹(Java併發程式設計進階課程) 什麼是Disruptor?它一個高效能的非同步處理框架,號稱“單執行緒每秒可處理600W個訂單”的神器,本課程目標:徹底精通一個如此優秀的開源框架,面試秒殺面試官。本章會帶領小夥伴們先了解課程大綱與重點,然後模擬千萬,億級資料進行壓力測試。讓大

併發程式設計之多執行緒執行緒安全

什麼是執行緒安全? 為什麼有執行緒安全問題? 當多個執行緒同時共享,同一個全域性變數或靜態變數,做寫的操作時,可能會發生資料衝突問題,也就是執行緒安全問題。但是做讀操作是不會發生資料衝突問題。 案例: 需求現在有100張火車票,有兩個視窗同時搶火車票,請使用多執行緒模擬搶票效果。 p

併發程式設計之多執行緒基礎

執行緒與程序區別 每個正在系統上執行的程式都是一個程序。每個程序包含一到多個執行緒。執行緒是一組指令的集合,或者是程式的特殊段,它可以在程式裡獨立執行。也可以把它理解為程式碼執行的上下文。所以執行緒基本上是輕量級的程序,它負責在單個程式裡執行多工。通常由作業系統負責多個執行緒的排程和執行。

併發程式設計經歷:同步加鎖之業務鎖

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

併發併發程式設計的挑戰

1. 上下文切換 a. 概念: i. CPU通過時間片分配演算法來迴圈執行任務,當前任務執行一個時間片後會切換到下一個任務。但是,在切換前會儲存上一個任務的狀態,以便下次切換回這個任務時,可以再載入這個任務的狀態。所以任務從儲存到再載入的過程就是一次上下文切換 b. 舉