Actor監控與容錯機制
在如今的IT行業中,如果一個系統不具備高可用以及穩定性,那麼它遲早會被淘汰的。也許有時出現故障並不是程式碼本身的問題,網路(連線超時、讀超時等)、伺服器故障、使用者操作有誤等原因,也不能影響使用者的體驗。程式發生故障時,我們要儘可能保證使用者體驗不受影響,這就需要我們的系統具有良好的容錯機制,能保證服務在不可用時自我修復。
監督策略
Actor作為Akka中最基本的執行單元,是所有業務的核心,如若發生故障,可能產生不可估量的後果,這就需要擁有一套容錯機制。前面我們已經提到,Actor系統具有監控機制,父級對子級進行管理,子級如果出現異常情況,父級可以通過處理邏輯來確定子級是重啟、恢復、還是停止。
Akka提供了兩種監督策略,分別是One-For-One Strategy和All-For-One Strategy。關於這兩種策略的區別如下:
策略 |
描述 |
One-For-One Strategy |
子Actor出現異常,只針對該Actor處理。 |
All-For-One Strategy |
子Actor出現異常,對所有子級Actor進行處理。 |
在程式中,當我們沒有為Actor指定管理策略,Akka提供了一套預設的策略:
- 丟擲ActorInitializationException、ActorKilledException 、DeathPactException 時,停止子Actor執行。
- 丟擲Exception時,重啟子Actor。
- 其它型別的Throwable異常時,會上溯到父級。
實際專案中,我們可能自定義了許多異常,以此來區分不同的情況。針對這些不同的異常,我們處理Actor方式或許不一樣,這就需要我們自定義監督策略。SupervisorStrategy是自定義監督策略核心類,建構函式如下:
public OneForOneStrategy(int maxNrOfRetries, java.time.Duration withinTimeRange, PartialFunction<Throwable, Directive> decider)
我們可以看到它有三個引數,分別是maxNrOfRetries、withinTimeRange、decider。其中maxNrOfRetries和withinTimeRange表明在一段時間的重啟次數,超過次數則stop。第三個引數是一個Function物件,我們可以在裡面定義監督指令,使用DeciderBuilder構建工廠可以快速編寫,監督指令有:
指令 |
描述 |
API |
重新啟動 |
會在舊例項上呼叫preRestart方法,停掉所有子級actor並呼叫postStop方法,然後在新例項上呼叫postRestart方法(預設會呼叫preStart方法),不會保留舊例項的狀態,保留郵箱(訊息)。 |
SupervisorStrategy.restart() |
恢復執行 |
出現異常,不會丟擲,保留之前的狀態。 |
SupervisorStrategy.resume() |
停止執行 |
銷燬actor相關資源,後續訊息不會接受。 |
SupervisorStrategy.stop() |
上溯父級 |
父級無法處理異常訊息,並把異常上報上一級父級。 |
SupervisorStrategy.escalate() |
下面,我們寫一個示例來觀察這幾個指令,示例如下:
監控者SupervisorActor類:
public class SupervisorActor extends AbstractActor {
/**
* 自定義OneForOneStrategy監督策略
*/
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.ofMinutes(1),
DeciderBuilder.match(IOException.class, e -> SupervisorStrategy.resume())
.match(NullPointerException.class, e -> SupervisorStrategy.restart())
.match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
.matchAny(o -> SupervisorStrategy.escalate())
.build());
private final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("system");
ActorRef supervisor = system.actorOf(Props.create(SupervisorActor.class), "supervisor");
supervisor.tell("IOException", ActorRef.noSender());
//supervisor.tell("NullPointerException", ActorRef.noSender());
//supervisor.tell("IllegalArgumentException", ActorRef.noSender());
supervisor.tell("get", ActorRef.noSender());
}
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void preStart() throws Exception {
ActorRef childActor = getContext().actorOf(Props.create(ChildActor.class), "childActor");
//監控
getContext().watch(childActor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, s -> {
Option<ActorRef> childActor = getContext().child("childActor");
ActorRef child = childActor.get();
if ("IOException".equals(s)) {
child.tell(new IOException(), getSelf());
} else if ("NullPointerException".equals(s)) {
child.tell(new NullPointerException("空指標異常"), getSelf());
} else if ("IllegalArgumentException".equals(s)) {
child.tell(new IllegalArgumentException(), getSelf());
}else if("get".equals(s)){
child.tell("get", getSelf());
}
}).match(Terminated.class, t -> {
logger.info("監控到" + t.getActor() + "停止了");
}).matchAny(other -> {
logger.info("state= " + other);
}).build();
}
}
子類ChildActor ,我們重寫preStart、postStop、preRestart、postRestart四個方法,方便我們觀察不同指令下的流程:
public class ChildActor extends AbstractActor {
/**
* 用來觀察不同處理後的結果
*/
private int state = 1;
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Exception.class, e -> {
this.state++;
throw e;
})
.matchEquals("get", s -> {
this.state++;
getSender().tell(state, getSelf());
})
.matchAny(o -> {
this.state++;
unhandled(o);
})
.build();
}
@Override
public void preStart() throws Exception {
super.preStart();
System.out.println("child preStart");
}
@Override
public void postStop() throws Exception {
super.postStop();
System.out.println("child postStop");
}
@Override
public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
System.out.println("child preRestart start: " + this.state);
super.preRestart(reason, message);
System.out.println("child preRestart end: " + this.state);
}
@Override
public void postRestart(Throwable reason) throws Exception {
System.out.println("child postRestart start: " + this.state);
super.postRestart(reason);
System.out.println("child postRestart end: " + this.state);
}
}
這裡特別提醒一下,ChildActor的createReceive方法中的this.state++必須寫在每個匹配項裡,如若寫在createReceive方法裡,只會執行一次。
執行SupervisorActor 的main方法,測試三種異常帶來的不同狀態結果(執行結果不貼出來了,大家可以執行一下示例), 結果說明如下:
異常 |
結果及說明 |
IOException |
匹配到resume指令,childActor恢復繼續執行,大家應該發現沒有丟擲異常,異常自動跳過。state值為3,說明之前的狀態保留了下來。 |
NullPointerException |
匹配到restart指令,childActor進行重啟。根據執行結果,大家應該發現,會先呼叫舊例項的preRestart方法,然後呼叫postStop,之後在新例項上呼叫postRestart,呼叫preStart方法。重啟後發現state=2,說明重啟會清空之前的狀態。 |
IllegalArgumentException |
匹配到stop指令,childActor停止執行,後續獲取state訊息沒有接受到,父類supervisor收到Terminated訊息。 |
正確的使用監控和容錯機制,可以在程式出現異常或故障時,保證系統不崩潰,對於系統的高可用和穩定性提供極大的保障。