討喜的隔離可變性(四)收發訊息
宣告:本文是《Java虛擬機器併發程式設計》的第五章,感謝華章出版社授權併發程式設計網站釋出此文,禁止以任何形式轉載此文。
我們可以向角色傳送任何型別的訊息——String、Integer、Long、Double、List、Map、元組(tuples)、Scala的case類…但其中有一點需要注意的是,上述所有型別的訊息都必須是不可變的。在上述這些型別中,我對於元組有著特殊的偏好,這並非因為我聽到別人把元組誤讀成“two-ples”時感到很有趣,而是由於元組是輕量的、不可變的並且是最容易建立的例項之一。例如,在Scala中,我們可以簡單地用(number1,number2)來建立一個含有兩個數字的元組。除了元組之外,Scala的case類也是用來定義訊息的理想型別——因為case類是不可變的、可以進行模式匹配並且還很容易進行復制。在Java中,我們可以通過將訊息定義為一個不可修改(unmodifiable)的Collection的方式來將多個物件塞到一個訊息中。當我們向角色傳遞訊息時,如果傳送者和接收者都在同一個JVM裡
與角色互動最簡單的方式莫過於“傳送並忘記(fire and forget)”,即先將訊息發出去,然後不等響應繼續做下面的事。這種做法從效能角度考慮也是最好的選擇。其中,傳送動作是由呼叫方角色/執行緒發起的一個非阻塞操作。我們可以使用sendOneWay()函式或Scala的!函式來發送一個單向訊息。
除了“傳送並忘記”互動模式之外,Akka還提供了雙向訊息互動模式,以應對我們在發出訊息之後需要等待對端角色響應的情況。在這種模式下,呼叫執行緒將被阻塞,直至收到對方響應或達到超時時間為止。下面讓我們一起來看看如何在Java和Scala中收發訊息。
在Java中收發訊息
我們可以通過sendRequestReply()函式來發送訊息並等待接收方響應。如果接收方的響應未在(可配置的)超時時間內到達,則系統將丟擲ActorTimeoutException異常。下面讓我們通過一個示例來學習這種雙向訊息通訊方式:
public class FortuneTeller extends UntypedActor { public void onReceive(final Object name) { getContext().replyUnsafe(String.format("%s you'll rock", name)); } public static void main(final String[] args) { final ActorRef fortuneTeller = Actors.actorOf(FortuneTeller.class).start(); try { final Object response = fortuneTeller.sendRequestReply("Joe"); System.out.println(response); } catch(ActorTimeoutException ex) { System.out.println("Never got a response before timeout"); } finally { fortuneTeller.stop(); } } }
在上面的程式碼中,我們定義了一個名為FortuneTeller的角色,它對收到的訊息都會直接進行回覆。為了傳送回覆訊息,我們需要先呼叫getContext()函式獲取呼叫上下文,然後再呼叫其replyUnsafe()函式來發送訊息內容。呼叫replyUnsafe()函式所執行的傳送動作是非阻塞的,並且請注意,在傳送響應訊息的過程中我們沒有呼叫任何與角色有關的程式碼。在main()函式中我們呼叫了sendRequestReply()函式,該函式會在內部建立一個Future類並等待對方響應或超時丟擲異常。下面讓我們通過執行上述程式碼來看看Joe的命運如何:
Joe you’ll rock
我們上面實現的這個FortuneTeller實際上還有個問題沒解決,即該角色依賴於訊息傳送者的傳送方式。當訊息傳送方呼叫sendRequestReply()函式時,該函式會建立一個內部的Future用於等待對方響應。而如果我們換用sendOneWay()來發送訊息的話,則replyUnsafe()函式將會失敗。為了避免這種情況的發生,我們需要在呼叫replyUnsafe()函式之前先檢查一下是否能匹配到一個處於阻塞狀態的傳送者。我們可以通過從上下文中讀取傳送者引用的方式來進行這個檢查,也可以通過replySafe()函式的返回值來進行判斷。因為當能取到傳送者的引用時該函式會返回true,反之則返回false。下面我們就著手對FortuneTeller進行修改,以使其可以處理髮送者沒有阻塞地等待響應訊息的情況:
public class FortuneTeller extends UntypedActor { public void onReceive(final Object name) { if(getContext().replySafe(String.format("%s you'll rock", name))) System.out.println("Message sent for " + name); else System.out.println("Sender not found for " + name); } public static void main(final String[] args) { final ActorRef fortuneTeller = Actors.actorOf(FortuneTeller.class).start(); try { fortuneTeller.sendOneWay("Bill"); final Object response = fortuneTeller.sendRequestReply("Joe"); System.out.println(response); } catch(ActorTimeoutException ex) { System.out.println("Never got a response before timeout"); } finally { fortuneTeller.stop(); } } }
如上所示,新版的FortuneTeller程式碼很優雅地處理了我們之前提到的那些問題,即使接收方沒找到傳送者也不會導致處理失敗。
Sender not found for Bill Message sent for Joe Joe you'll rock
我們知道,sendRequestReply()函式是需要等待對方響應的阻塞式函式,而sendOneWay()函式則是單向且非阻塞的。而如果既想要接收響應又不想被阻塞,則可以使用更復雜一些的sendRequestReplyFuture()函式。該函式可以返回一個Future物件,而拿到Future物件之後我們就可以繼續幹其他的事,直到我們真正需要用到對方的響應時,再選擇阻塞式地等待或通過之前拿到的那個Future物件來查詢對方的響應是否已經可用。類似地,在角色這一側我們可以從上下文引用中取到senderFuture,並通過它來與傳送方進行通訊。在後面的示例中,我們會看到上述這些函式的具體用法,這裡就不再贅述了。
請務必謹慎使用sendRequestReply()和sendRequestReplyFuture()函式,因為這兩個函式都是阻塞的,所以呼叫它們對程式的效能和可擴充套件性都會造成負面影響。
在Scala中收發訊息
如果想要在Scala中與角色進行訊息收發,我們需要有些心理準備,因為在Scala中我們所採用的方法將會與Java API有不小的差別:
在Scala中,我們可以直接使用self屬性來訪問actor。通過該屬性,我們可以呼叫reply()函式或replySafe()函式,其中reply()就是replyUnsafe()在Scala側的等價函式。
在Scala中,我們既可以呼叫sendRequestReply()函式,也可以呼叫更優雅的 !! 函式——當然美是人者見仁智者見智的。同樣地,!!!也可以被用來替換sendRequestReplyFuture()函式。
在Scala中,sendRequestReply()函式不再返回一個Object,而是返回一個Scala的Option。當接收方的響應抵達時,這個Option將是一個Some[T]的例項。該例項中存有響應的具體內容,而在超時的情況下則響應內容為None。所以,與Java版本所不同的是,在Scala中呼叫sendRequestReply()函式在超時的時候不會拋異常。
下面讓我們先用不安全的reply()函式實現Scala版的FortuneTeller:
class FortuneTeller extends Actor { def receive = { case name : String => self.reply(String.format("%s you'll rock", name)) } } 176 • Chapter 8. Favoring Isolated Mutability object FortuneTeller { def main(args : Array[String]) : Unit = { val fortuneTeller = Actor.actorOf[FortuneTeller].start() val response = fortuneTeller !! "Joe" response match { case Some(responseMessage) => println(responseMessage) case None => println("Never got a response before timeout") } fortuneTeller.stop() } }
在角色的實現程式碼中,我們可以看到與Java版本的兩點不同:其一是這裡中我們用self代替了getContext()函式,另一個則是用reply()代替了replyUnsafe()函式。在呼叫方這一側,我們使用了!!,即java中的sendRequestReply()函式來給角色傳送訊息,並在所收到的響應內容上應用了模式匹配。如果傳送方收到對端的響應,則第一個case語句將被執行,而如果響應超時則第二個case語句將被執行。不出所料,該示例程式碼的執行結果與Java版完全相同:
Joe you'll rock
除了我們之前所討論過的那些變更之外,安全版reply()函式的使用方式與Java版本差別不大。在Scala中,我們使用的是reply_?()或replySafe()。
class FortuneTeller extends Actor { def receive = { case name : String => if(self.reply_?(String.format("%s you'll rock", name))) println("Message sent for " + name) else println("Sender not found for " + name) } } object FortuneTeller { def main(args : Array[String]) : Unit = { val fortuneTeller = Actor.actorOf[FortuneTeller].start() fortuneTeller ! "Bill" val response = fortuneTeller !! "Joe" response match { case Some(responseMessage) => println(responseMessage) case None => println("Never got a response before timeout") } fortuneTeller.stop() } }
通過上述修改,即使是在傳送者未知的情況下,新版本的FortureTeller也不會失敗:
Sender not found for Bill Message sent for Joe Joe you'll rock
Akka有一點很方便的行為就是,當我們用Akka傳送訊息時,它會將傳送者的引用透明地傳遞過去。於是我們就無需顯式地將傳送者作為訊息的一部分傳遞出去,從而省去了很多繁冗的程式碼。
如果不習慣使用像!、!!、!!!以及reply_?這樣的函式名,我們也可以分別用sendOneWay()、sendRequestReply()、sendRequestReplyFuture()以及replySafe()這些函式來替換使用。
[1] Akka同樣支援遠端角色,以便使我們可以在不同機器的離散程序之間傳送訊息。