AKKA文件(java版)—容錯
原文連結 譯者:小魚
正如角色系統這一章中解釋的一樣,每一個角色都是它孩子的監管者,並且像這樣的角色都會定義錯誤處理監管策略。這個策略在成為角色系統結構的一個完整部分之後是不能被改變的。
錯誤處理實踐
首先,讓我們看一個處理資料儲存錯誤的例子,它是實踐應用中一個典型的錯誤根源。當然它基於真實應用,這個應用的資料儲存有可能是無效的,但我們在這個例子中會用一個最有效的重連方法來實現。
讀下面的原始碼。內嵌的註釋解釋了錯誤處理的不同塊和為什麼要新增它們。強烈的建議去執行這個例子,這樣才能更簡單的去跟蹤這個日誌輸出,來了解執行的時候發生了什麼。
建立一個監管策略
下面的章節解釋了錯誤處理機制和更深入的選擇。
根據示範的目的,讓我們來考慮如下策略:
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.create("1 minute"), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return resume(); } else if (t instanceof NullPointerException) { return restart(); } else if (t instanceof IllegalArgumentException) { return stop(); } else { return escalate(); } } }); @Override public SupervisorStrategy supervisorStrategy() { return strategy; }
我選擇了一些大家熟知的異常型別,是為了展示在監管和監視章節中描述的錯誤處理指令的應用。首先,它是一個一對一的策略,意味著每一個孩子都是分開處理的(多對一的策略工作非常類似,唯一的不同就是任何決策都會應用到監管者的所有孩子,不僅僅是發生錯誤的那個)。在重啟頻率上會有一些限制,最大是每分鐘重啟10次。-1和Duration.Inf()意味著限制沒有應用,拋開這個可能性,去指定一個絕對的上限或者去讓這個重啟的工作沒有上限。當超出這個限制,孩子角色就被停止。
注意:如果策略在監管角色(而不是一個單獨的類)中描述了,它的決策者可以線上程安全的形勢下訪問角色的所有內部狀態,包括獲得當前發生錯誤的孩子的引用(例如錯誤訊息的getSender)。
預設監管策略
如果定義的策略沒有覆蓋丟擲的異常,Escalate會被使用。當沒有為一個角色定義監管策略,如下的異常會按照預設來處理:
1. ActorInitializationException會停止發生錯誤的子角色
2. ActorKilledException會停止發生錯誤的子角色
3. Exception會重啟發生錯誤的子角色
4. 別的丟擲型別會升級到父角色
如果異常升級到根監管者,會按上述的預設策略處理。
停止監管策略
跟Erlang方式類似的策略是當它們失敗的時候只停止子角色,以及當DeathWatch通知丟失的子角色的時候會對監管者採取正確的動作。
記錄角色失敗的資訊
預設SupervisorStrategy會記錄失敗資訊除非它們被向上升級。建議在更高層次的結構中處理上升的錯誤,並潛在的記錄下來。
在初始化的時候你可以通過設定SupervisorStrategy的loggingEnabled為false來去掉預設的日誌。可以在Decider裡定製日誌。注意如getSender一樣,當SupervisorStrategy在監管角色中描述,當前失敗的子角色引用是有效的。
你可以通重寫logFailure方法在你自己的SupervisorStrategy實現中定製化日誌。
最高層次角色的監管
最高層次角色意味著它們是通過system.actorOf()建立的,並且它們是User Guardian的孩子。在這種情況下沒有特定的規則,守護者僅僅應用配置策略。
測試應用
下面章節展示了實踐中不同指令的效果,wherefor測試啟動是需要的。首先,我們需要一個匹配的監管者。
public class Supervisor extends UntypedActor { private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.create("1 minute"), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return resume(); } else if (t instanceof NullPointerException) { return restart(); } else if (t instanceof IllegalArgumentException) { return stop(); } else { return escalate(); } } }); @Override public SupervisorStrategy supervisorStrategy() { return strategy; } public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o), getSelf()); } else { unhandled(o); } } }
這個監管者會被用於建立子角色,我們可以實驗一下:
public class Child extends UntypedActor { int state = 0; public void onReceive(Object o) throws Exception { if (o instanceof Exception) { throw (Exception) o; } else if (o instanceof Integer) { state = (Integer) o; } else if (o.equals("get")) { getSender().tell(state, getSelf()); } else { unhandled(o); } } }
測試使用Testing Actor Systems裡介紹的實用工具會比較簡單,TestProbe提供一個Actor Ref用於接收和檢查訊息回覆。
import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.SupervisorStrategy; import static akka.actor.SupervisorStrategy.resume; import static akka.actor.SupervisorStrategy.restart; import static akka.actor.SupervisorStrategy.stop; import static akka.actor.SupervisorStrategy.escalate; import akka.actor.SupervisorStrategy.Directive; import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; import scala.collection.immutable.Seq; import scala.concurrent.Await; import static akka.pattern.Patterns.ask; import scala.concurrent.duration.Duration; import akka.testkit.AkkaSpec; import akka.testkit.TestProbe; public class FaultHandlingTest { static ActorSystem system; Duration timeout = Duration.create(5, SECONDS); @BeforeClass public static void start() { system = ActorSystem.create("test", AkkaSpec.testConf()); } @AfterClass public static void cleanup() { JavaTestKit.shutdownActorSystem(system); system = null; } @Test public void mustEmploySupervisorStrategy() throws Exception { // code here } }
讓我們建立角色
Props superprops = Props.create(Supervisor.class); ActorRef supervisor = system.actorOf(superprops, "supervisor"); ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);
第一個測試會演示Resume指令,所以我們嘗試通過把角色的狀態設定成非初始化狀態並讓它失敗:
child.tell(42, ActorRef.noSender()); assert Await.result(ask(child, "get", 5000), timeout).equals(42); child.tell(new ArithmeticException(), ActorRef.noSender()); assert Await.result(ask(child, "get", 5000), timeout).equals(42);
你可以看到值42讓錯誤處理指令存活下來。現在,如果我們把錯誤改成一個更嚴重的NullPointerException異常,則將不會是這種情況:
child.tell(new NullPointerException(), ActorRef.noSender()); assert Await.result(ask(child, "get", 5000), timeout).equals(0);
發生IllegalArgumentException致命異常的情況下,最終會導致子角色會被監管者中斷:
final TestProbe probe = new TestProbe(system); probe.watch(child); child.tell(new IllegalArgumentException(), ActorRef.noSender()); probe.expectMsgClass(Terminated.class);
到目前為止,監管者還沒有完全受到子角色失敗的影響,因為指令集會處理它。萬一丟擲一個異常,監管者會向上丟擲錯誤。
child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout); probe.watch(child); assert Await.result(ask(child, "get", 5000), timeout).equals(0); child.tell(new Exception(), ActorRef.noSender()); probe.expectMsgClass(Terminated.class);
監管者自己是由ActorSystem提供的最高等級的角色監管的,在所有的異常(注意這兩個異常
ActorInitializationException和ActorKilledException)情況下,預設策略是重新啟動。一旦預設的指令重啟去殺死所有的孩子,我們期望我們的窮孩子不要倖存這個錯誤。
這不是所希望的(這依賴於用例),我們需要去用一個不同的監管者重寫這個行為。
public class Supervisor2 extends UntypedActor { private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.create("1 minute"), new Function<Throwable, Directive>() { @Override public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return resume(); } else if (t instanceof NullPointerException) { return restart(); } else if (t instanceof IllegalArgumentException) { return stop(); } else { return escalate(); } } }); @Override public SupervisorStrategy supervisorStrategy() { return strategy; } public void onReceive(Object o) { if (o instanceof Props) { getSender().tell(getContext().actorOf((Props) o), getSelf()); } else { unhandled(o); } } @Override public void preRestart(Throwable cause, Option<Object> msg) { // do not kill all children, which is the default here } }
通過父親,孩子角色倖存向上升級重啟,如最後一段測試程式碼所演示的:
superprops = Props.create(Supervisor2.class); supervisor = system.actorOf(superprops); child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout); child.tell(23, ActorRef.noSender()); assert Await.result(ask(child, "get", 5000), timeout).equals(23); child.tell(new Exception(), ActorRef.noSender()); assert Await.result(ask(child, "get", 5000), timeout).equals(0);