Akka之訊息路由
阿新 • • 發佈:2018-12-09
Akka提供了非常靈活的訊息傳送機制。有時候我們使用一組Actor來提供服務,這一組Actor中所有的Actor都是對等的,也就是說你可以找任何一個Actor來為你服務。這種情況下,如何才能快速有效地找到合適的Actor呢?或者說如何排程這些訊息,才可以使負載更為均衡地分配在這一組Actor中呢?為了解決這個問題,Akka使用一個路由器元件(Router)來封裝訊息的排程。系統提供了幾種實用的訊息路由策略,比如,輪詢選擇Actor進行訊息傳送,隨機訊息傳送,將訊息傳送給最為空閒的Actor,甚至是在組內廣播訊息。
例項
package com.bzb.java8.akka;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
/**
* @author bzb
* @Description: 當前示例中的唯一一個Actor
* @date 2018/9/14 14:16
*/
public class MyWorker extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public static enum Msg {
WORKING, DONE, CLOSE;
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg == Msg.WORKING) {
System.out.println("I am working");
}
if (msg == Msg.DONE) {
System.out.println("Stop working");
}
if (msg == Msg.CLOSE) {
System.out.println("I will shutdow");
getSender().tell(Msg.CLOSE, getSelf());
// 收到CLOSE訊息時,關閉自己,結束執行。
getContext().stop(getSelf());
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;
import java.util.ArrayList;
import java.util.List;
/**
* @author bzb
* @Description: 監視者,如同勞動監工一樣,一旦被監視者因為停止工作,則監視者就會收到一條訊息
* @date 2018/9/14 14:27
*/
public class WatchActor extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
// 定義路由器元件Router
public Router router;
{
List<Routee> routees = new ArrayList<>();
for (int i = 0; i < 5; i++) {
ActorRef worker = getContext().actorOf(Props.create(MyWorker.class), "worker-" + i);
getContext().watch(worker);
routees.add(new ActorRefRoutee(worker));
}
// 構造Router時,需要指定路由策略和一組被路由的Actor(Routee)。這裡使用了RoundRobinRoutingLogin,也就是對所有的Routee輪詢傳送訊息。
/**
* RoundRobinRoutingLogic: 輪詢
* BroadcastRoutingLogic: 廣播
* RandomRoutingLogic: 隨機
* SmallestMailboxRoutingLogic: 空閒
*/
router = new Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof MyWorker.Msg) {
// 當需要投遞訊息給這5個worker時,只需要將訊息投遞給這個Router即可。
// Router給根據給定的路由策略進行訊息投遞。
router.route(msg, getSender());
} else if (msg instanceof Terminated) { // 終止訊息
// 當一個worker停止工作時,可以簡單的將其從工作組中移除
router = router.removeRoutee(((Terminated) msg).actor());
// 列印終止的Actor路徑和剩餘工作組的大小
System.out.println(((Terminated) msg).actor().path() + " is closed, routees = " + router.routees().size());
// 如果系統中沒有可以的Actor,就會直接關閉系統
if (router.routees().size() == 0) {
System.out.println("Closed Systme");
RouteMain.flag.send(false);
getContext().system().shutdown();
}
} else {
unhandled(msg);
}
}
}
package com.bzb.java8.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.agent.Agent;
import akka.dispatch.ExecutionContexts;
import com.typesafe.config.ConfigFactory;
import scala.concurrent.ExecutionContext;
/**
* @author bzb
* @Description:
* @date 2018/9/14 17:42
*/
public class RouteMain {
public static Agent<Boolean> flag = Agent.create(true, ExecutionContexts.global());
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("route", ConfigFactory.load("samplehello.conf"));
ActorRef watcher = system.actorOf(Props.create(WatchActor.class), "watcher");
int i = 1;
// 不停傳送訊息,知道flag為false,即路由繫結的工作組中的Actor為0
while (flag.get()) {
watcher.tell(MyWorker.Msg.WORKING, ActorRef.noSender());
if (i % 10 == 0) {
watcher.tell(MyWorker.Msg.CLOSE, ActorRef.noSender()); // 每10次傳送一條關閉訊息
}
i++;
Thread.sleep(100);
}
}
}
參考資源
<實戰java高併發程式設計>