1. 程式人生 > >Actor監控與容錯機制

Actor監控與容錯機制

       在如今的IT行業中,如果一個系統不具備高可用以及穩定性,那麼它遲早會被淘汰的。也許有時出現故障並不是程式碼本身的問題,網路(連線超時、讀超時等)、伺服器故障、使用者操作有誤等原因,也不能影響使用者的體驗。程式發生故障時,我們要儘可能保證使用者體驗不受影響,這就需要我們的系統具有良好的容錯機制,能保證服務在不可用時自我修復。

     監督策略

       Actor作為Akka中最基本的執行單元,是所有業務的核心,如若發生故障,可能產生不可估量的後果,這就需要擁有一套容錯機制。前面我們已經提到,Actor系統具有監控機制,父級對子級進行管理,子級如果出現異常情況,父級可以通過處理邏輯來確定子級是重啟、恢復、還是停止。

       Akka提供了兩種監督策略,分別是One-For-One Strategy和All-For-One Strategy。關於這兩種策略的區別如下:

策略

描述

One-For-One Strategy

子Actor出現異常,只針對該Actor處理。

All-For-One Strategy

子Actor出現異常,對所有子級Actor進行處理。

       在程式中,當我們沒有為Actor指定管理策略,Akka提供了一套預設的策略:

  • 丟擲ActorInitializationException、ActorKilledException 、DeathPactException 時,停止子Actor執行。
  • 丟擲Exception時,重啟子Actor。
  • 其它型別的Throwable異常時,會上溯到父級。

       實際專案中,我們可能自定義了許多異常,以此來區分不同的情況。針對這些不同的異常,我們處理Actor方式或許不一樣,這就需要我們自定義監督策略。SupervisorStrategy是自定義監督策略核心類,建構函式如下:

public OneForOneStrategy(int maxNrOfRetries, java.time.Duration withinTimeRange, PartialFunction<Throwable, Directive> decider)

      我們可以看到它有三個引數,分別是maxNrOfRetries、withinTimeRange、decider。其中maxNrOfRetries和withinTimeRange表明在一段時間的重啟次數,超過次數則stop。第三個引數是一個Function物件,我們可以在裡面定義監督指令,使用DeciderBuilder構建工廠可以快速編寫,監督指令有:

指令

描述

API

重新啟動

會在舊例項上呼叫preRestart方法,停掉所有子級actor並呼叫postStop方法,然後在新例項上呼叫postRestart方法(預設會呼叫preStart方法),不會保留舊例項的狀態,保留郵箱(訊息)。

SupervisorStrategy.restart()

恢復執行

出現異常,不會丟擲,保留之前的狀態。

SupervisorStrategy.resume()

停止執行

銷燬actor相關資源,後續訊息不會接受。

SupervisorStrategy.stop()

上溯父級

父級無法處理異常訊息,並把異常上報上一級父級。

SupervisorStrategy.escalate()

       下面,我們寫一個示例來觀察這幾個指令,示例如下:

         監控者SupervisorActor類:

public class SupervisorActor extends AbstractActor {
    /**
     * 自定義OneForOneStrategy監督策略
     */
    private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.ofMinutes(1),
            DeciderBuilder.match(IOException.class, e -> SupervisorStrategy.resume())
                    .match(NullPointerException.class, e -> SupervisorStrategy.restart())
                    .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
                    .matchAny(o -> SupervisorStrategy.escalate())
                    .build());
    private final LoggingAdapter logger = Logging.getLogger(getContext().getSystem(), this);


    public static void main(String[] args) throws InterruptedException {
        ActorSystem system = ActorSystem.create("system");
        ActorRef supervisor = system.actorOf(Props.create(SupervisorActor.class), "supervisor");
        supervisor.tell("IOException", ActorRef.noSender());
        //supervisor.tell("NullPointerException", ActorRef.noSender());
        //supervisor.tell("IllegalArgumentException", ActorRef.noSender());
        supervisor.tell("get", ActorRef.noSender());
    }

    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }

    @Override
    public void preStart() throws Exception {
        ActorRef childActor = getContext().actorOf(Props.create(ChildActor.class), "childActor");
        //監控
        getContext().watch(childActor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(String.class, s -> {
            Option<ActorRef> childActor = getContext().child("childActor");
            ActorRef child = childActor.get();
            if ("IOException".equals(s)) {
                child.tell(new IOException(), getSelf());
            } else if ("NullPointerException".equals(s)) {
                child.tell(new NullPointerException("空指標異常"), getSelf());
            } else if ("IllegalArgumentException".equals(s)) {
                child.tell(new IllegalArgumentException(), getSelf());
            }else if("get".equals(s)){
                child.tell("get", getSelf());
            }
        }).match(Terminated.class, t -> {
            logger.info("監控到" + t.getActor() + "停止了");
        }).matchAny(other -> {
            logger.info("state= " + other);
        }).build();
    }
}

子類ChildActor ,我們重寫preStart、postStop、preRestart、postRestart四個方法,方便我們觀察不同指令下的流程:

public class ChildActor extends AbstractActor {
    /**
     * 用來觀察不同處理後的結果
     */
    private int state = 1;

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Exception.class, e -> {
                    this.state++;
                    throw e;
                })
                .matchEquals("get", s -> {
                    this.state++;
                    getSender().tell(state, getSelf());
                })
                .matchAny(o -> {
                    this.state++;
                    unhandled(o);
                })
                .build();
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        System.out.println("child preStart");
    }

    @Override
    public void postStop() throws Exception {
        super.postStop();
        System.out.println("child postStop");
    }

    @Override
    public void preRestart(Throwable reason, Optional<Object> message) throws Exception {
        System.out.println("child preRestart start: " + this.state);
        super.preRestart(reason, message);
        System.out.println("child preRestart end: " + this.state);
    }

    @Override
    public void postRestart(Throwable reason) throws Exception {
        System.out.println("child postRestart start: " + this.state);
        super.postRestart(reason);
        System.out.println("child postRestart end: " + this.state);
    }
}

      這裡特別提醒一下,ChildActor的createReceive方法中的this.state++必須寫在每個匹配項裡,如若寫在createReceive方法裡,只會執行一次。

       執行SupervisorActor 的main方法,測試三種異常帶來的不同狀態結果(執行結果不貼出來了,大家可以執行一下示例),  結果說明如下:

異常

結果及說明

IOException

匹配到resume指令,childActor恢復繼續執行,大家應該發現沒有丟擲異常,異常自動跳過。state值為3,說明之前的狀態保留了下來。

NullPointerException

匹配到restart指令,childActor進行重啟。根據執行結果,大家應該發現,會先呼叫舊例項的preRestart方法,然後呼叫postStop,之後在新例項上呼叫postRestart,呼叫preStart方法。重啟後發現state=2,說明重啟會清空之前的狀態。

IllegalArgumentException

匹配到stop指令,childActor停止執行,後續獲取state訊息沒有接受到,父類supervisor收到Terminated訊息。

      正確的使用監控和容錯機制,可以在程式出現異常或故障時,保證系統不崩潰,對於系統的高可用和穩定性提供極大的保障。