1. 程式人生 > >討喜的隔離可變性(四)收發訊息

討喜的隔離可變性(四)收發訊息

宣告:本文是《Java虛擬機器併發程式設計》的第五章,感謝華章出版社授權併發程式設計網站釋出此文,禁止以任何形式轉載此文。

我們可以向角色傳送任何型別的訊息——String、Integer、Long、Double、List、Map、元組(tuples)、Scala的case類…但其中有一點需要注意的是,上述所有型別的訊息都必須是不可變的。在上述這些型別中,我對於元組有著特殊的偏好,這並非因為我聽到別人把元組誤讀成“two-ples”時感到很有趣,而是由於元組是輕量的、不可變的並且是最容易建立的例項之一。例如,在Scala中,我們可以簡單地用(number1,number2)來建立一個含有兩個數字的元組。除了元組之外,Scala的case類也是用來定義訊息的理想型別——因為case類是不可變的、可以進行模式匹配並且還很容易進行復制。在Java中,我們可以通過將訊息定義為一個不可修改(unmodifiable)的Collection的方式來將多個物件塞到一個訊息中。當我們向角色傳遞訊息時,如果傳送者和接收者都在同一個JVM裡

[1],則預設情況下我們傳遞的是訊息的引用。需要注意的是,保證所傳遞訊息的不可變性是程式設計師自己的責任,尤其是當所傳送的訊息是我們自定義的類時則更需要加倍小心。為了解決這個問題,我們可以讓Akka替我們先將訊息序列化,然後將序列化出來的拷貝而不是原物件的引用傳送出去,這樣就可以避免由於類定義不嚴謹所造成的問題。

與角色互動最簡單的方式莫過於“傳送並忘記(fire and forget)”,即先將訊息發出去,然後不等響應繼續做下面的事。這種做法從效能角度考慮也是最好的選擇。其中,傳送動作是由呼叫方角色/執行緒發起的一個非阻塞操作。我們可以使用sendOneWay()函式或Scala的!函式來發送一個單向訊息。

除了“傳送並忘記”互動模式之外,Akka還提供了雙向訊息互動模式,以應對我們在發出訊息之後需要等待對端角色響應的情況。在這種模式下,呼叫執行緒將被阻塞,直至收到對方響應或達到超時時間為止。下面讓我們一起來看看如何在Java和Scala中收發訊息。

在Java中收發訊息

我們可以通過sendRequestReply()函式來發送訊息並等待接收方響應。如果接收方的響應未在(可配置的)超時時間內到達,則系統將丟擲ActorTimeoutException異常。下面讓我們通過一個示例來學習這種雙向訊息通訊方式:

public class FortuneTeller extends UntypedActor {
public void onReceive(final Object name) {
getContext().replyUnsafe(String.format("%s you'll rock", name));
}
public static void main(final String[] args) {
final ActorRef fortuneTeller =
Actors.actorOf(FortuneTeller.class).start();
try {
final Object response = fortuneTeller.sendRequestReply("Joe");
System.out.println(response);
} catch(ActorTimeoutException ex) {
System.out.println("Never got a response before timeout");
} finally {
fortuneTeller.stop();
}
}
}

在上面的程式碼中,我們定義了一個名為FortuneTeller的角色,它對收到的訊息都會直接進行回覆。為了傳送回覆訊息,我們需要先呼叫getContext()函式獲取呼叫上下文,然後再呼叫其replyUnsafe()函式來發送訊息內容。呼叫replyUnsafe()函式所執行的傳送動作是非阻塞的,並且請注意,在傳送響應訊息的過程中我們沒有呼叫任何與角色有關的程式碼。在main()函式中我們呼叫了sendRequestReply()函式,該函式會在內部建立一個Future類並等待對方響應或超時丟擲異常。下面讓我們通過執行上述程式碼來看看Joe的命運如何:

Joe you’ll rock

我們上面實現的這個FortuneTeller實際上還有個問題沒解決,即該角色依賴於訊息傳送者的傳送方式。當訊息傳送方呼叫sendRequestReply()函式時,該函式會建立一個內部的Future用於等待對方響應。而如果我們換用sendOneWay()來發送訊息的話,則replyUnsafe()函式將會失敗。為了避免這種情況的發生,我們需要在呼叫replyUnsafe()函式之前先檢查一下是否能匹配到一個處於阻塞狀態的傳送者。我們可以通過從上下文中讀取傳送者引用的方式來進行這個檢查,也可以通過replySafe()函式的返回值來進行判斷。因為當能取到傳送者的引用時該函式會返回true,反之則返回false。下面我們就著手對FortuneTeller進行修改,以使其可以處理髮送者沒有阻塞地等待響應訊息的情況:

public class FortuneTeller extends UntypedActor {
public void onReceive(final Object name) {
if(getContext().replySafe(String.format("%s you'll rock", name)))
System.out.println("Message sent for " + name);
else
System.out.println("Sender not found for " + name);
}
public static void main(final String[] args) {
final ActorRef fortuneTeller =
Actors.actorOf(FortuneTeller.class).start();
try {
fortuneTeller.sendOneWay("Bill");
final Object response = fortuneTeller.sendRequestReply("Joe");
System.out.println(response);
} catch(ActorTimeoutException ex) {
System.out.println("Never got a response before timeout");
} finally {
fortuneTeller.stop();
}
}
}

如上所示,新版的FortuneTeller程式碼很優雅地處理了我們之前提到的那些問題,即使接收方沒找到傳送者也不會導致處理失敗。

Sender not found for Bill
Message sent for Joe
Joe you'll rock

我們知道,sendRequestReply()函式是需要等待對方響應的阻塞式函式,而sendOneWay()函式則是單向且非阻塞的。而如果既想要接收響應又不想被阻塞,則可以使用更復雜一些的sendRequestReplyFuture()函式。該函式可以返回一個Future物件,而拿到Future物件之後我們就可以繼續幹其他的事,直到我們真正需要用到對方的響應時,再選擇阻塞式地等待或通過之前拿到的那個Future物件來查詢對方的響應是否已經可用。類似地,在角色這一側我們可以從上下文引用中取到senderFuture,並通過它來與傳送方進行通訊。在後面的示例中,我們會看到上述這些函式的具體用法,這裡就不再贅述了。

請務必謹慎使用sendRequestReply()和sendRequestReplyFuture()函式,因為這兩個函式都是阻塞的,所以呼叫它們對程式的效能和可擴充套件性都會造成負面影響。

在Scala中收發訊息

如果想要在Scala中與角色進行訊息收發,我們需要有些心理準備,因為在Scala中我們所採用的方法將會與Java API有不小的差別:

在Scala中,我們可以直接使用self屬性來訪問actor。通過該屬性,我們可以呼叫reply()函式或replySafe()函式,其中reply()就是replyUnsafe()在Scala側的等價函式。

在Scala中,我們既可以呼叫sendRequestReply()函式,也可以呼叫更優雅的 !! 函式——當然美是人者見仁智者見智的。同樣地,!!!也可以被用來替換sendRequestReplyFuture()函式。

在Scala中,sendRequestReply()函式不再返回一個Object,而是返回一個Scala的Option。當接收方的響應抵達時,這個Option將是一個Some[T]的例項。該例項中存有響應的具體內容,而在超時的情況下則響應內容為None。所以,與Java版本所不同的是,在Scala中呼叫sendRequestReply()函式在超時的時候不會拋異常。

下面讓我們先用不安全的reply()函式實現Scala版的FortuneTeller:

class FortuneTeller extends Actor {
def receive = {
case name : String =>
self.reply(String.format("%s you'll rock", name))
}
}
176 • Chapter 8. Favoring Isolated Mutability
object FortuneTeller {
def main(args : Array[String]) : Unit = {
val fortuneTeller = Actor.actorOf[FortuneTeller].start()
val response = fortuneTeller !! "Joe"
response match {
case Some(responseMessage) => println(responseMessage)
case None => println("Never got a response before timeout")
}
fortuneTeller.stop()
}
}

在角色的實現程式碼中,我們可以看到與Java版本的兩點不同:其一是這裡中我們用self代替了getContext()函式,另一個則是用reply()代替了replyUnsafe()函式。在呼叫方這一側,我們使用了!!,即java中的sendRequestReply()函式來給角色傳送訊息,並在所收到的響應內容上應用了模式匹配。如果傳送方收到對端的響應,則第一個case語句將被執行,而如果響應超時則第二個case語句將被執行。不出所料,該示例程式碼的執行結果與Java版完全相同:

Joe you'll rock

除了我們之前所討論過的那些變更之外,安全版reply()函式的使用方式與Java版本差別不大。在Scala中,我們使用的是reply_?()或replySafe()。

class FortuneTeller extends Actor {
def receive = {
case name : String =>
if(self.reply_?(String.format("%s you'll rock", name)))
println("Message sent for " + name)
else
println("Sender not found for " + name)
}
}
object FortuneTeller {
def main(args : Array[String]) : Unit = {
val fortuneTeller = Actor.actorOf[FortuneTeller].start()
fortuneTeller ! "Bill"
val response = fortuneTeller !! "Joe"
response match {
case Some(responseMessage) => println(responseMessage)
case None => println("Never got a response before timeout")
}
fortuneTeller.stop()
}
}

通過上述修改,即使是在傳送者未知的情況下,新版本的FortureTeller也不會失敗:

Sender not found for Bill
Message sent for Joe
Joe you'll rock

Akka有一點很方便的行為就是,當我們用Akka傳送訊息時,它會將傳送者的引用透明地傳遞過去。於是我們就無需顯式地將傳送者作為訊息的一部分傳遞出去,從而省去了很多繁冗的程式碼。

如果不習慣使用像!、!!、!!!以及reply_?這樣的函式名,我們也可以分別用sendOneWay()、sendRequestReply()、sendRequestReplyFuture()以及replySafe()這些函式來替換使用。


[1] Akka同樣支援遠端角色,以便使我們可以在不同機器的離散程序之間傳送訊息。


方 騰飛

花名清英,併發網(ifeve.com)創始人,暢銷書《Java併發程式設計的藝術》作者,螞蟻金服技術專家。目前工作於支付寶微貸事業部,關注網際網路金融,併發程式設計和敏捷實踐。微信公眾號aliqinying。