1. 程式人生 > >Akka之訊息路由

Akka之訊息路由

Akka提供了非常靈活的訊息傳送機制。有時候我們使用一組Actor來提供服務,這一組Actor中所有的Actor都是對等的,也就是說你可以找任何一個Actor來為你服務。這種情況下,如何才能快速有效地找到合適的Actor呢?或者說如何排程這些訊息,才可以使負載更為均衡地分配在這一組Actor中呢?為了解決這個問題,Akka使用一個路由器元件(Router)來封裝訊息的排程。系統提供了幾種實用的訊息路由策略,比如,輪詢選擇Actor進行訊息傳送,隨機訊息傳送,將訊息傳送給最為空閒的Actor,甚至是在組內廣播訊息。

例項

package com.bzb.java8.akka;

import
akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; /** * @author bzb * @Description: 當前示例中的唯一一個Actor * @date 2018/9/14 14:16 */ public class MyWorker extends UntypedActor { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); public
static enum Msg { WORKING, DONE, CLOSE; } @Override public void onReceive(Object msg) throws Exception { if (msg == Msg.WORKING) { System.out.println("I am working"); } if (msg == Msg.DONE) { System.out.println("Stop working"); } if
(msg == Msg.CLOSE) { System.out.println("I will shutdow"); getSender().tell(Msg.CLOSE, getSelf()); // 收到CLOSE訊息時,關閉自己,結束執行。 getContext().stop(getSelf()); } else { unhandled(msg); } } }
package com.bzb.java8.akka;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;

import java.util.ArrayList;
import java.util.List;

/**
 * @author bzb
 * @Description: 監視者,如同勞動監工一樣,一旦被監視者因為停止工作,則監視者就會收到一條訊息
 * @date 2018/9/14 14:27
 */
public class WatchActor extends UntypedActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    // 定義路由器元件Router
    public Router router;

    {
        List<Routee> routees = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            ActorRef worker = getContext().actorOf(Props.create(MyWorker.class), "worker-" + i);
            getContext().watch(worker);
            routees.add(new ActorRefRoutee(worker));
        }

        // 構造Router時,需要指定路由策略和一組被路由的Actor(Routee)。這裡使用了RoundRobinRoutingLogin,也就是對所有的Routee輪詢傳送訊息。
        /**
         * RoundRobinRoutingLogic: 輪詢
         * BroadcastRoutingLogic: 廣播
         * RandomRoutingLogic: 隨機
         * SmallestMailboxRoutingLogic: 空閒
         */
        router = new Router(new RoundRobinRoutingLogic(), routees);
    }

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof MyWorker.Msg) {
            // 當需要投遞訊息給這5個worker時,只需要將訊息投遞給這個Router即可。
            // Router給根據給定的路由策略進行訊息投遞。
            router.route(msg, getSender());
        } else if (msg instanceof Terminated) { // 終止訊息

            // 當一個worker停止工作時,可以簡單的將其從工作組中移除
            router = router.removeRoutee(((Terminated) msg).actor());

            // 列印終止的Actor路徑和剩餘工作組的大小
            System.out.println(((Terminated) msg).actor().path() + " is closed, routees = " + router.routees().size());

            // 如果系統中沒有可以的Actor,就會直接關閉系統
            if (router.routees().size() == 0) {
                System.out.println("Closed Systme");
                RouteMain.flag.send(false);
                getContext().system().shutdown();
            }
         } else {
            unhandled(msg);
        }
    }
}
package com.bzb.java8.akka;


import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.agent.Agent;
import akka.dispatch.ExecutionContexts;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;

/**
 * @author bzb
 * @Description:
 * @date 2018/9/14 17:42
 */
public class RouteMain {

    public static Agent<Boolean> flag = Agent.create(true, ExecutionContexts.global());

    public static void main(String[] args) throws InterruptedException {
        ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));

        ActorRef watcher = system.actorOf(Props.create(WatchActor.class), "watcher");

        int i = 1;

        // 不停傳送訊息,知道flag為false,即路由繫結的工作組中的Actor為0
        while (flag.get()) {
            watcher.tell(MyWorker.Msg.WORKING, ActorRef.noSender());
            if (i % 10 == 0) {
                watcher.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); // 每10次傳送一條關閉訊息
            }
            i++;
            Thread.sleep(100);
        }
    }
}

參考資源

<實戰java高併發程式設計>