akka router簡單例項
阿新 • • 發佈:2018-12-25
參考連結:http://sunxiang0918.cn/2016/01/13/Akka-in-JAVA-2/#Router
參考連結:http://blog.csdn.net/liubenlong007/article/details/54574064
通常在分散式任務排程系統中會有這樣的需求:一組actor提供相同的服務,我們在呼叫任務的時候只需要選擇其中一個actor進行處理即可。
其實這就是一個負載均衡或者說路由策略,akka作為一個高效能支援併發的actor模型,可以用來作為任務排程叢集使用,當然負載均衡就是其本職工作了,akka提供了Router
來進行訊息的排程。
在真實的情況中,通常針對某一種訊息,會啟動很多個相同的Actor來進行處理.當然,你可以在程式中迴圈的啟動很多個相同的Actor來實現,就如上一小結中啟動100個Actor那樣,但是這就牽涉到Actor任務的平衡,Actor個數的維護等等,比較的麻煩.因此,在AKKA中存在一種特殊的Actor,即Router
Router
機制,來有效的分配訊息給actor來完成工作.而在AKKA中,被Router
管理的actor被稱作Routee
.
根據專案的需求,可以使用不同的路由策略來分發一個訊息到actor中.Akka附帶了幾個常用的路由策略,配置起就可以使用.當然,也可以自定義一個路由器.
InboxActor2
package com.eastcom.first.spark.data.akka.router; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; public class InboxActor2 extends UntypedActor { private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public void onReceive(Object o) throws Throwable { if (o == MsgEnum.WORKING) { log.info("i am working."); } else if (o == MsgEnum.DONE) { log.info("i am done"); } else if (o == MsgEnum.CLOSE) { log.info("i am close." + getContext().self().path()); getContext().stop(getSelf());// 關閉自己 } else { unhandled(o); } } }
InboxMain2
package com.eastcom.first.spark.data.akka.router; import java.util.concurrent.atomic.AtomicBoolean; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.routing.RoundRobinPool; public class InboxMain2 { public static AtomicBoolean flag = new AtomicBoolean(true); public static void main(String[] args) throws InterruptedException { // ActorSystem system = ActorSystem.create("routerTest", // ConfigFactory.load("akka.config")); ActorSystem system = ActorSystem.create("System"); ActorRef workerRouter = system.actorOf(Props.create(InboxActor2.class).withRouter(new RoundRobinPool(3)), "RouterTest"); int i = 1; int workSize = 0; while (flag.get()) { workerRouter.tell(MsgEnum.WORKING, ActorRef.noSender()); if (i % 3 == 0) { workerRouter.tell(MsgEnum.CLOSE, ActorRef.noSender()); workSize++; if (workSize >= 3) { flag.set(false); } } Thread.sleep(1000); i++; } system.terminate(); } }
執行結果
[INFO] [08/10/2017 18:10:06.696] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$a] i am working.
[INFO] [08/10/2017 18:10:07.687] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:08.687] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$c] i am working.
[INFO] [08/10/2017 18:10:08.688] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$a] i am close.akka://System/user/RouterTest/$a
[INFO] [08/10/2017 18:10:09.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:10.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$c] i am working.
[INFO] [08/10/2017 18:10:11.699] [System-akka.actor.default-dispatcher-2] [akka://System/user/RouterTest/$c] i am close.akka://System/user/RouterTest/$c
[INFO] [08/10/2017 18:10:11.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:12.703] [System-akka.actor.default-dispatcher-6] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:13.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:14.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:14.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am close.akka://System/user/RouterTest/$b