Scala併發程式設計基礎
轉載作者:搖擺少年夢
轉載地址:https://blog.csdn.net/lovehuangjiaju/article/details/47623177
本節主要內容
- Scala併發程式設計簡介
- Scala Actor併發程式設計模型
- react模型
- Actor的幾種狀態
- Actor深入使用解析
1. Scala併發程式設計簡介
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行業中最不可告人的一個小祕密,他明確論證了處理器在速度上的發展已經走到了盡頭,並且將由全新的單晶片上的並行 “核心”(虛擬 CPU)所取代。這一發現對程式設計社群造成了不小的衝擊,因為正確建立執行緒安全的程式碼,在理論而非實踐中,始終會提高高效能開發人員的身價,而讓各公司難以聘用他們。看上去,僅有少數人充分理解了 Java 的執行緒模型、併發 API 以及 “同步” 的含義,以便能夠編寫同時提供安全性和吞吐量的程式碼 —— 並且大多數人已經明白了它的困難所在(來源:
在Java中,要編寫一個執行緒安全的程式並不是一件易事,例如:
class Account {
private int balance;
synchronized public int getBalance() {
return balance;
}
synchronized public void incrementBalance() {
balance++;
}
}
上面這段java程式碼雖然方法前面加了synchronized ,但它仍然不是執行緒安全的,例如,在執行下面兩個語句
account.incrementBalance();
account.getBalance();
時,有可能account.incrementBalance()執行完成後,其它執行緒可能會獲取物件的鎖,修改account的balance,從而造成得不到預期結果的問題。解決問題的方法是將兩個功能結合起來形成一個方法:
synchronized public int incrementAndGetBalance() {
balance++;
return balance;
}
但這可能並不是我們想要的,每次獲取balance都要將balance增加, 這顯然與實際不符。除此之外,java中的併發程式設計可能還會經常遇到死鎖問題,而這個問題往往難除錯,問題可能會隨機性的出現。總體上來看,java的併發程式設計模型相對較複雜,難以駕馭。
Scala很好地解決了java併發程式設計的問題,要在scala中進行併發程式設計,有以下幾種途徑可以實現:
1 actor訊息模型、akka actor併發模型。
2 Thread、Runnable
3 java.util.concurennt
4 第三方開源併發框架如Netty,Mina
在上述四種途徑當中,利用 actor訊息模型、akka actor併發模型是scala併發程式設計的首先,本節主要介紹actor訊息模型,akka actor併發模型我們將放在後面的章節中介紹。
在scala中,通過不變物件來實現執行緒安全,涉及到修改物件狀態時,則建立一個新的物件來實現,如:
//成員balance狀態一旦被賦值,便不能更改
//因而它也是執行緒安全的
class Person(val age: Integer) {
def getAge() = age
}
object Person{
//建立新的物件來實現物件狀態修改
def increment(person: Person): Person{
new Person(Person.getAge() + 1)
}
}
通過不變物件實現併發程式設計,可以簡化程式設計模型,使併發程式更容易現實和控制。
2.Scala Actor併發程式設計模型
java中的併發主要是通過執行緒來實現,各執行緒採用共享資源的機制來實現程式的併發,這裡面臨競爭資源的問題,雖然採用鎖機制可以避免競爭資源的問題,但會存在死鎖問題,要開發一套健壯的併發應用程式具有一定的難度。而scala的併發模型相比於java它更簡單,它採用訊息傳遞而非資源共享來實現程式的併發,訊息傳遞正是通過Actor來實現的。下面的程式碼給出了Actor使用示例
//混入Actor特質,然後實現act方法
//如同java中的Runnable介面一樣
//各執行緒的run方法是併發執行的
//Actor中的act方法也是併發執行的
class ActorDemo extends Actor{
//實現 act()方法
def act(){
while(true){
//receive從郵箱中獲取一條訊息
//然後傳遞給它的引數
//該引數是一個偏函式
receive{
case "actorDemo" => println("receive....ActorDemo")
}
}
}
}
object ActorDemo extends App{
val actor=new ActorDemo
//啟動建立的actor
actor.start()
//主執行緒傳送訊息給actor
actor!"actorDemo"
}
下面給的是recieve方法的部分原始碼
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
synchronized {
if (shouldExit) exit() // links
drainSendBuffer(mailbox)
}
var done = false
while (!done) {
val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
senders = replyTo :: senders
val matches = f.isDefinedAt(m)
senders = senders.tail
matches
})
................
從上述程式碼中不能看出,receive方法接受的引數是一個偏函式,並且是通過mailbox來實現訊息的傳送與接收。
在前述的class ActorDemo中,receive方法的引數為
{
case "actorDemo" => println("receive....ActorDemo")
}
該程式碼塊在執行時被轉換為一個PartialFunction[Any, R]的偏函式,其中R是偏函式的返回型別,對應case 語句=> 右邊的部分,在本例子中R是Unit型別,而Any對應的則對應case語句的模式部分。
前面給的是通過extends Actor的方式來建立一個Actor類,其實scala.actors.Actor中提供了一個actor工具方法,可以非常方便地直接建立Actor物件如:
import scala.actors.Actor._
object ActorFromMethod extends App{
//通過工具方法actor直接建立Actor物件
val methodActor = actor {
for (i <- 1 to 5)
println("That is the question.")
Thread.sleep(1000)
}
}
上述程式碼建立的actor物件無需呼叫start方法,物件建立完成後會立即執行。
scala中本地執行緒也可用作Actor,下面的程式碼演示瞭如何在REPL命令列中將本地執行緒當作Actor;
scala> import scala.actors.Actor._
import scala.actors.Actor._
//self引用本地執行緒,併發送訊息
scala> self ! "hello"
//接收訊息
scala> self.receive { case x:String => x }
res1: String = hello
上述程式碼中,如果傳送的訊息不是String型別的,執行緒將被阻塞,為避免這個問題,可以採用receiveWithin方法,
scala> self ! 123
scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123
scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT
- react模型
這裡給出另外一個作者的react介紹,更通俗易懂:react
scala中的Actor也是構建在java執行緒基礎之上的,前面在使用Actor時都是通過建立Actor物件,然後再呼叫act方法來啟動actor。我們知道,java中執行緒的建立、銷燬及執行緒間的切換是比較耗時的,因此實際中儘量避免頻繁的執行緒建立、銷燬和銷燬。Scala中提供react方法,在方法執行結束後,執行緒仍然被保留。下面的程式碼演示了react方法的使用:
package cn.scala.xtwy.concurrency
import scala.actors._
object NameResolver extends Actor {
import java.net.{ InetAddress, UnknownHostException }
def act() {
react {
//匹配主執行緒發來的("www.scala-lang.org", NameResolver)
case (name: String, actor: Actor) =>
//向actor傳送解析後的IP地址資訊
//由於本例中傳進來的actor就是NameResolver自身
//也即自己給自己傳送訊息,並存入將訊息存入郵箱
actor ! getIp(name)
//再次呼叫act方法,試圖從郵箱中提取資訊
//如果郵箱中資訊為空,則進入等待模式
act()
case "EXIT" =>
println("Name resolver exiting.")
// quit
//匹配郵箱中的單個資訊,本例中會匹配郵箱中的IP地址資訊
case msg =>
println("Unhandled message: " + msg)
act()
}
}
def getIp(name: String): Option[InetAddress] = {
try {
Some(InetAddress.getByName(name))
} catch {
case _: UnknownHostException => None
}
}
}
object Main extends App{
NameResolver.start()
//主執行緒向NameResolver傳送訊息("www.scala-lang.org", NameResolver)
NameResolver ! ("www.scala-lang.org", NameResolver)
NameResolver ! ("wwwwww.scala-lang.org", NameResolver)
}
從上述程式碼中可以看到,通過在react方法執行結束時加入act方法,方法執行完成後沒有被銷燬,而是繼續試圖從郵箱中獲取資訊,獲取不到則等待。
4. Actor的幾種狀態
Actor有下列幾種狀態:
- 初始狀態(New),Actor物件被建立,但還沒有啟動即沒有執行start方法時的狀態
- 執行狀態(Runnable),正在執行時的狀態
- 掛起狀態(Suspended),在react方法中等待時的狀態
- 時間點掛起狀態(TimedSuspended),掛起狀態的一種特殊形式,reactWithin方法中的等待時的狀態
- 阻塞狀態(Blocked),在receive方法中阻塞等待時的狀態
- 時間點阻塞狀態(TimedBlocked),在receiveWithin方法中阻塞等待時的狀態
- 結束狀態(Terminated),執行完成後被銷燬
5. Actor深入使用解析
1 receive方法單次執行:
object Actor2
{
case class Speak(line : String)
case class Gesture(bodyPart : String, action : String)
case class NegotiateNewContract()
def main(args : Array[String]) =
{
val badActor =
actor
{
//這裡receive方法只會匹配一次便結束
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}
//receive方法只處理下面這條語句傳送的訊息
badActor ! NegotiateNewContract
//下面其餘的訊息不會被處理
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
}
}
上述程式碼只會輸出:
I won’t do it for less than $1 million!
即後面傳送的訊息如:
badActor ! Speak(“Do ya feel lucky, punk?”)
badActor ! Gesture(“face”, “grimaces”)
badActor ! Speak(“Well, do ya?”)
不會被處理。這是因為receive方法的單次執行問題。
2 能夠處理多個訊息的receive方法:
object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract()
//處理結束訊息
case class ThatsAWrap()
def main(args : Array[String]) =
{
val badActor =
actor
{
var done = false
//while迴圈
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println("Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}
}
//下面所有的訊息都能被處理
badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
//訊息傳送後,receive方法執行完畢
badActor ! ThatsAWrap
}
}
3 Actor後面實現原理仍然是執行緒的證據
object Actor3
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
def ct =
"Thread " + Thread.currentThread().getName() + ": "
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println(ct + "I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(ct + line)
case Gesture(bodyPart, action) =>
System.out.println(ct + "(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println(ct + "Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println(ct + "Huh? I'll be in my trailer.")
}
}
}
System.out.println(ct + "Negotiating...")
badActor ! NegotiateNewContract
System.out.println(ct + "Speaking...")
badActor ! Speak("Do ya feel lucky, punk?")
System.out.println(ct + "Gesturing...")
badActor ! Gesture("face", "grimaces")
System.out.println(ct + "Speaking again...")
badActor ! Speak("Well, do ya?")
System.out.println(ct + "Wrapping up")
badActor ! ThatsAWrap
}
}
執行結果如下:
Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!
從上述執行結果可以看到,Actor最終的實現仍然是執行緒,只不過它提供的程式設計模型與java中的程式設計模型不一樣而已。
4 利用!?傳送同步訊息,等待返回值
import scala.actors._,Actor._
object ProdConSample2
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
reply("Already RECEIVED....."+msg)
}
}
}
System.out.println("Sending....")
//獲取響應值
val r= consumer !? "Mares eat oats"
println("replyed message"+r)
System.out.println("Sending....")
consumer !? "Does eat oats"
System.out.println("Sending....")
consumer !? "Little lambs eat ivy"
System.out.println("Sending....")
consumer !? "Kids eat ivy too"
System.out.println("Sending....")
consumer !? "DONE"
}
}
程式碼執行結果:
Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE
通過上述程式碼執行結果可以看到,!?因為是同步訊息,傳送完返回結果後才會接著傳送下一條訊息。
5 Spawn方法傳送訊息
object ProdConSampleUsingSpawn
{
import concurrent.ops._
def main(args : Array[String]) : Unit =
{
// Spawn Consumer
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("MESSAGE RECEIVED: " + msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}
// Spawn Producer
spawn //spawn是一個定義在current.ops中的方法
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too",
"DONE"
);
importantInfo.foreach((msg) => consumer !? msg)
}
}
}
6 !! 傳送非同步訊息,返回值是 Future[Any]
object ProdConSample3
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
reply("Already RECEIVED....."+msg)
}
}
}
System.out.println("Sending....")
//傳送非同步訊息,返回
val replyFuture= consumer !! "Mares eat oats"
val r=replyFuture()
println("replyed message*****"+r)
System.out.println("Sending....")
consumer !! "Does eat oats"
System.out.println("Sending....")
consumer !! "Little lambs eat ivy"
System.out.println("Sending....")
consumer !! "Kids eat ivy too"
System.out.println("Sending....")
consumer !! "DONE"
}
}
執行結果:
Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE
通過上述程式碼的執行結果可以看到,!!的訊息傳送是非同步的,訊息傳送後無需等待結果返回便執行下一條語句,但如果要獲取非同步訊息的返回值,如:
val replyFuture= consumer !! "Mares eat oats"
val r=replyFuture()
則執行到這兩條語句的時候,程式先被阻塞,等獲得結果之後再發送其它的非同步訊息。
7 loop方法實現react
object LoopReact extends App{
val a1 = Actor.actor {
//注意這裡loop是一個方法,不是關鍵字
//實現型別while迴圈的作用
loop {
react {
//為整型時結束操作
case x: Int=>println("a1 stop: " + x); exit()
case msg: String => println("a1: " + msg)
}
}
}
a1!("我是搖擺少年夢")
a1.!(23)
}
相關推薦
Scala入門到精通——第二十六節 Scala併發程式設計基礎
本節主要內容 Scala併發程式設計簡介 Scala Actor併發程式設計模型 react模型 Actor的幾種狀態 Actor深入使用解析 1. Scala併發程式設計簡介 2003 年,Herb Sutter 在他的文章 “The Fre
Scala併發程式設計基礎
轉載作者:搖擺少年夢轉載地址:https://blog.csdn.net/lovehuangjiaju/article/details/47623177本節主要內容Scala併發程式設計簡介Scala Actor併發程式設計模型react模型Actor的幾種狀態Actor深入
併發程式設計基礎——執行緒狀態,啟動及停止的幾種方式
前言 遙想當年大二,實習面試的時候,面試官一個問題:作業系統最小的排程單元是什麼?當時還沒學過作業系統,只知道程序的概念,於是乎信心滿滿的答道,當然是程序啊,然後......就沒有然後了。 之後再看這個問題,其實就是一個笑話。作業系統排程的最小單元其實是執行緒。現在想想當時,自己大二就敢
併發程式設計基礎——JMM簡介
前言 這篇部落格嘗試針對JMM模型進行總結,並分析volatile和synchronized的一些原理(理解的並不深入) JMM記憶體模型 在談JMM記憶體模型的之前,得先了解JVM記憶體模型。 JVM記憶體模型 JVM在執行程式的時候將自動管理的記憶體劃分為幾個區域。從總體
併發-Java併發程式設計基礎
Java併發程式設計基礎 併發 在電腦科學中,併發是指將一個程式,演算法劃分為若干個邏輯組成部分,這些部分可以以任何順序進行執行,但與最終順序執行的結果一致。併發可以在多核作業系統上顯著的提高程式執行速度。 併發與並行聯絡與區別 這裡參考ErLang之父的解釋,ErLang之父談到了如何向一個5歲小孩解釋併發
《Java併發程式設計的藝術》第4章 Java併發程式設計基礎
Java誕生時就選擇了內建對多執行緒的支援 作業系統執行一個程式時,會為其建立一個程序。 執行緒是作業系統排程的最小單元,都有各自的計數器、堆疊、區域性變數。能訪問共享的記憶體變數。CPU在他們上高速切換,讓人感覺在同步執行。 執行緒會被分到若干時間片,時間片用
網際網路架構學習-第一章 併發程式設計基礎(三)
1 第一章 併發程式設計基礎 1.3 Volatile及原子性 Volatile概念 Volatile關鍵字的主要作用是使變數在多個執行緒間可見。在多執行緒間可以進行變數的變更,使得執行緒間進行資料的共
Java併發程式設計基礎
4.1 執行緒簡介 4.1.1 什麼是執行緒 現代作業系統在執行一個程式時,會為其建立一個程序。例如,啟動一個Java程式,作業系統就會建立一個Java程序。現代作業系統排程的最小單元是執行緒,也叫輕量級程序(LightWeight Process),在一
Java併發程式設計基礎//程序:每個程序都有獨立的程式碼和資料空間(程序上下文),程序間的切換開銷比較大,一個程序包含1-n個執行緒 //執行緒:同一類執行緒共享程式碼和資料空間,每個執行緒擁有獨立的執行棧和程式計
1.實現多執行緒的兩種方式: (1)繼承Thread類; (2)實現Runnable介面 //程序:每個程序都有獨立的程式碼和資料空間(程序上下文),程序間的切換開銷比較大,一個程序包含1-n個執行緒 //執行緒:同一類執行緒共享程式碼和資料空間,每個執行緒擁有獨立的執行
泥瓦匠聊併發程式設計基礎篇:執行緒中斷和終止
1 執行緒中斷 1.1 什麼是執行緒中斷? 執行緒中斷是執行緒的標誌位屬性。而不是真正終止執行緒,和執行緒的狀態無關。執行緒中斷過程表示一個執行中的執行緒,通過其他執行緒呼叫了該執行緒的 interrupt() 方法,使得該執行緒中斷標誌位屬性改變。 深入思考下,執行緒中斷不是去中斷了執行緒,
第69講 scala併發程式設計 react 、loop 程式設計
上一節我們講了,actor的receive 偏函式,它屬於 每請求沒執行緒模式,用完了就銷燬。 有沒有執行緒共享,請看本文。 scala 為了提升效能,有2種共享執行緒方式,一種是使用react ,另一種是 loop方法。 一、 react 方式 程式
scala 併發程式設計
1.Actor的建立、啟動和訊息收發 //相當於java的thread //java多執行緒是共享全域性的加鎖的程式設計機制 /** Actor trait就類似於Java中的Thread和Runnable一樣, 是基礎的多執行緒基類和介面。我們只要重寫
scala併發程式設計第一章習題
1.下面的方法簽名實現一個compose方法 def compose[A,B,C](g:B => C ,f : A => B):A => C = x => g(f(x)) 思路就是上一篇文章說的關於compose和andThe
python中併發程式設計基礎1
併發程式設計基礎概念 1.程序。 什麼是程序? 正在執行的程式就是程序。程式只是程式碼。 什麼是多道? 多道技術: 1.空間上的複用(記憶體)。將記憶體分為幾個部分,每個部分放入一個程式,這樣同一時間在記憶體中就有了多道程式。 2.時間上的複用(CPU的分配)。只有一個CPU,如果程式
python併發程式設計基礎之守護程序、佇列、鎖
併發程式設計2 1.守護程序 什麼是守護程序? 表示程序A守護程序B,當被守護程序B結束後,程序A也就結束。 from multiprocessing import Process import time def task(): print('妃子的一生') time.s
《Java併發程式設計實踐》筆記1——併發程式設計基礎
1.執行緒安全定義: 當多個執行緒訪問一個類時,如果不用考慮這些執行緒在執行時環境下的排程和交替執行,並且不需要額外的同步及在呼叫方程式碼不必做其他的協調,這個類的行為仍然是正確的,那麼這個類就被稱之為是執行緒安全的。簡言之對於執行緒安全類的例項進行順序或併發的一系列操作,
Scala併發程式設計模型AKKA
一、併發程式設計模型AKKA Spark使用底層通訊框架AKKA 分散式 master worker hadoop使用的是rpc 1)akka簡介 寫併發程式很難,AKKA解決spark這個問題。 akka構建在JVM平臺上,是
C++11 併發程式設計基礎(一):併發、並行與C++多執行緒
正文 C++11標準在標準庫中為多執行緒提供了元件,這意味著使用C++編寫與平臺無關的多執行緒程式成為可能,而C++程式的可移植性也得到了有力的保證。另外,併發程式設計可提高應用的效能,這對對效能錙銖必較的C++程式設計師來說是值得關注的。 回到頂部 1. 何為併發 併發指的是兩個或多個獨立的活動在同
移動端併發程式設計基礎篇-阻塞佇列ArrayBlockingQueue&LinkedBlockingQueue
1.BlockingQueue和普通Queue的區別 BlockingQueue阻塞佇列,多執行緒併發的上下文中,take,put,方法會發生阻塞狀態 Queue 普通的Queue如果實現生產者,消費者的阻塞等待,需要自己實現Blocking狀態 2.ArrayB
Scala ------- 併發程式設計(1)
1.併發程式設計名詞解釋(重點) 並行:多個處理器或者是多核的處理器同時處理多個不同的任務 併發:一個處理器同時處理多個任務 同步:某段時間只有一個執行緒執行一個任務 非同步:某段時間多個執