1. 程式人生 > >Scala 並行和併發程式設計-Futures 和 Promises

Scala 並行和併發程式設計-Futures 和 Promises

最近看了《七週七語言:理解多種程式設計泛型》,介紹了七種語言(四種程式設計泛型)的主要特性:基本語法,集合,並行/併發,其中就有 Scala。你不能指望這種書全面介紹,因為其中任何一門語言都夠寫一本書了~

我比較關注並行/併發,但是書中關於 Scala 的併發部分——Actor,程式碼編譯不通過,“Deprecated”,哎,這書點不負責,程式碼也不寫採用編譯器的版本。於是就到 Scala 官網看了一下,即便是官網,也列出了對 Actor 的改進,有些已經不再使用了~

Java 在它的版本 8 之前,函數語言程式設計實在太弱了,不然也不會出現像 Scala 這樣在 JVM 上執行,能夠與 Java 完美融合的語言(估計,Java 在函數語言程式設計在這方面,太落後了,社群已經等不急了,而函數語言程式設計最大的優點是——並行)。

簡介

Future提供了一套高效非阻塞(non-blocking)的方式完成並行操作。其基本思想很簡單,所謂 Future,指的是一類佔位符物件(placeholder object),用於指代某些尚未完成計算的結果。一般,由Future的計算結果都是並行執行的,計算完後再使用。以這種方式組織並行任務,便可以寫出高效、非同步、非阻塞的並行程式碼。

預設情況,future 和 promise 利用回撥(callback)的非阻塞方式,並不是採用典型的阻塞方式。為了在語法和概念層面簡化回撥的使用,Scala 提供了 flatMap、foreach 和 filter 等運算元(combinator),使得我們能夠以非阻塞的方式對future進行組合。當然,future 對於那些必須使用阻塞的情況仍然支援阻塞操作,可以阻塞等待future(不過不鼓勵這樣做)。

一個典型的 future 如下所示:

    val inverseFuture:Future[Matrix]=Future{
    fatMatrix.inverse()// non-blocking long lasting computatio
    }(executionContext)


或是更常用的:
implicit val ec:ExecutionContext=...

val inverseFuture :Future[Matrix]=Future{
fatMatrix.inverse()
}// ec is implicitly passed


這兩個程式碼片段把 fatMatrix.inverse()
的執行委託給 ExecutionContext,在 inverseFuture 中體現計算結果。

Futures

所謂 Future,是一種用於指代某個尚未就緒的值的物件。這個值通常是某個計算過程的結果:

  • 若該計算過程尚未完成,我們就說該Future未完成;
  • 若該計算過程正常結束,或中途丟擲異常,我們就說該Future已完成。

Future 完成分為兩種情況:

  • 當Future帶著某個值而完成時,我們就說該Future帶著計算結果成功完成。
  • 當Future帶著異常而完成時,計算過程中丟擲的異常,我們就說Future因異常而失敗。

Future 具有一個重要的屬性——只能被賦值一次。一旦給定了某個值或某個異常,future物件就變成了不可變物件——無法再被改寫。

建立future物件最簡單的方法是呼叫future方法,開始非同步(asynchronous)計算,並返回儲存有計算結果的futrue。一旦該future計算完成,其結果就變的可用。

注意,Future[T] 是一個型別,表示future物件,而future是一個方法,建立和排程一個非同步計算,並返回一個帶有計算結果的future物件。

下面通過一個例子來展示。

假設,我們使用某個社交網路假想的API獲取某個使用者的朋友列表,我們將開啟一個新對話(session),然後傳送一個獲取特定使用者朋友列表的請求。

    import scala.concurrent._

     

    importExecutionContext.Implicits.global

    val session = socialNetwork.createSessionFor("user", credentials)
    val f:Future[List[Friend]]=Future{
    session.getFriends()

    }

上面,首先匯入 scala.concurrent 包。然後,通過一個假想的 createSessionFor 方法初始化一個向伺服器傳送請求 session 變數。這個請求是通過網路傳送的,所以可能耗時很長。呼叫 getFriends 方法返回 List[Friend]。為了更好的利用CPU,知道響應到達,不應該阻塞(block)程式的其他部分,這個計算應該被非同步排程。future方法就是這樣做的,它並行地執行指定的計算塊,在這個例子中,向伺服器傳送請求,等待響應。

一旦伺服器響應,future f 中的好友列表將變得可用。

失敗可能會導致一個 exception。在下面的例子中,session 的值未被正確的初始化,於是,future 塊中計算將丟擲一個 NullPointerException。這樣,future f 失敗了。

val session =null
val f:Future[List[Friend]]=Future{
session.getFriends

}

上面的 import ExecutionContext.Implicits.global 匯入預設的全域性執行上下文(global execution context)。執行上下文執行提交給他們的任務,你也可把執行上下文看作執行緒池,這對future方法是必不可少的,因為,它們處理如何和何時執行非同步計算。你可以定義自己的執行上下文,並用 future 使用,但現在,只需要知道你能夠通過上面的語句匯入預設執行上下文就足夠了。

我們的例子是基於一個假想的社交網路 API,計算包含了傳送網路請求和等待響應。下面,假設你有一個文字檔案,想找出一個特定詞第一次出現的位置。當磁碟正在檢索此檔案時,這個計算過程可能會陷入阻塞,因此,並行執行程式的剩餘部分將很有意義。

val firstOccurrence:Future[Int]=Future{
val source = scala.io.Source.fromFile("e:scalamyText.txt")
source.toSeq.indexOfSlice("myKeyword")
}


回撥函式

現在,我們知道如何開始一個非同步計算來建立一個新的future值,但是我們沒有演示一旦此結果變得可用後如何使用。我們經常對計算結果感興趣而不僅僅是它的副作用(side-effects)。

在許多future的實現中,一旦future的客戶端對結果感興趣,它必須阻塞它自己的計算,並等待直到future完成——然後才能使用future的值繼續它自己的計算。雖然這在Scala Future API(在後面會展示)中是允許的,但從效能角度來看更好的辦法是完全非阻塞,即在future中註冊一個回撥。一旦future完成,就非同步呼叫回撥。如果當註冊回撥,future已經完成,那麼,回撥或是非同步執行,或在相同的執行緒中循序執行。

註冊回撥最通常的形式,是使用OnComplete方法,即建立一個Try[T] => U 型別的回撥函式。如果future成功完成,回撥則會應用到Success[T]型別的值中,否則應用到 Failure[T]型別的值中。

Try[T]Option[T] Either[T, S] 相似,因為它是一個可能持有某種型別值的單子(monda)。然而,它是為持有一個值或異常物件特殊設計的。Option[T] 既可以是一個值(如:Some[T])也可以完全不是值(如:None),如果Try[T]獲得一個值是,那麼它是Success[T] ,否則為持有異常的 Failure[T]Failure[T] 有很多資訊,不僅僅是關於為什麼沒有值 None。同時,也可以把Try[T] 看作一種特殊版本的 Either[Throwable, T],特別是當左邊值為一個 Throwable 的情形。

“一個單子(Monad)說白了不過就是自函子範疇上的一個么半群而已。”這句話出自Haskell大神Philip Wadler,也是他提議把Monad引入Haskell。

回到我們社交網路的例子,假設,我們想獲取最近的帖子並顯示在螢幕上,可以通過呼叫 getRecentPosts 方法,它返回 List[String]:

import scala.util.{Success,Failure}


val f:Future[List[String]]=Future{
    session.getRecentPosts
}


f onComplete {
    caseSuccess(posts)=>for(post <- posts) println(post)
    caseFailure(t)=> println("An error has occured: "+ t.getMessage)
}

onComplete 方法允許客戶處理失敗或成功的future 結果。對於成功,onSuccess 回撥使用如下:

val f:Future[List[String]]=Future{
    session.getRecentPosts
}

f onSuccess {
    case posts =>for(post <- posts
) 

println(post)

}

對於失敗,onFailure 回撥使用如下:

val f:Future[List[String]]=Future{
    session.getRecentPosts
}

f onFailure {
    case t => println("An error has occured: "+ t.getMessage)
}

f onSuccess {
    case posts =>for(post <- posts) println(post)
}

onFailure 回撥只有在 future 失敗,也就是包含一個異常時才會執行。

因為部分函式(partial functions)具有 isDefinedAt 方法, 所以,onFailure 方法只有為了特定 Throwable 而定義才會觸發。下面的例子,已註冊的onFailure 回撥永遠不會被觸發:

val f =Future{

2/0

}

 

f onFailure {

case npe:NullPointerException=>

println("I'd be amazed if this printed out.")

}


部分函式(Partial functions),假設有一個數學函式 f(a,b,c)partial(f,1)返回的是數學函式 f(1,b,c),函式的引數 a 已經被代入。

回到前面例子,查詢某個第一次出現的關鍵字,在螢幕上輸出該關鍵字的位置:

val firstOccurrence:Future[Int]=Future{

val source = scala.io.Source.fromFile("myText.txt")

source.toSeq.indexOfSlice("myKeyword")

}

firstOccurrence onSuccess {

case idx => println("The keyword first appears at position: "+ idx)

}

firstOccurrence onFailure {

case t => println("Could not process file: "+ t.getMessage)

}


onComplete,、onSuccess 和 onFailure 方法都具有結果型別 Unit,這意味著這些回撥方法不能被連結。注意,這種設計是為了避免鏈式呼叫可能隱含在已註冊回撥上一個順序的執行(同一個 future 中註冊的回撥是無序的)。

也就是說,我們現在應討論論何時呼叫回撥。因為回撥需要future 中的值是可用的,只有future完成後才能被呼叫。然而,不能保證被完成 future 的執行緒或建立回撥的執行緒呼叫。反而, 回撥有時會在future物件完成後被某個執行緒呼叫。我們可以說,回撥最終會被執行。

更進一步,回撥被執行的順序不是預先定義的,甚至在同一個應用程式。事實上,回撥也許不是一個接一個連續呼叫的,但在同一時間併發呼叫。這意味著,下面例子中,變數 totalA 也許不能從計算的文字中得到大小寫字母數量的正確值。

@volatilevar totalA =0

val text =Future{

"na"*16+"BATMAN!!!"

}

text onSuccess {

case txt => totalA += txt.count(_ =='a')

}


看著論壇牛人的部落格寫的,自己還是太嫩,沒那麼厲害能想這麼多呢,給大家分享下