1. 程式人生 > >akka router簡單例項

akka router簡單例項

參考連結:http://sunxiang0918.cn/2016/01/13/Akka-in-JAVA-2/#Router

參考連結:http://blog.csdn.net/liubenlong007/article/details/54574064

通常在分散式任務排程系統中會有這樣的需求:一組actor提供相同的服務,我們在呼叫任務的時候只需要選擇其中一個actor進行處理即可。 
其實這就是一個負載均衡或者說路由策略,akka作為一個高效能支援併發的actor模型,可以用來作為任務排程叢集使用,當然負載均衡就是其本職工作了,akka提供了Router來進行訊息的排程。

在真實的情況中,通常針對某一種訊息,會啟動很多個相同的Actor來進行處理.當然,你可以在程式中迴圈的啟動很多個相同的Actor來實現,就如上一小結中啟動100個Actor那樣,但是這就牽涉到Actor任務的平衡,Actor個數的維護等等,比較的麻煩.因此,在AKKA中存在一種特殊的Actor,即Router

.Akka通過Router機制,來有效的分配訊息給actor來完成工作.而在AKKA中,被Router管理的actor被稱作Routee.

根據專案的需求,可以使用不同的路由策略來分發一個訊息到actor中.Akka附帶了幾個常用的路由策略,配置起就可以使用.當然,也可以自定義一個路由器.


InboxActor2

package com.eastcom.first.spark.data.akka.router;

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class InboxActor2 extends UntypedActor {

	private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

	@Override
	public void onReceive(Object o) throws Throwable {
		if (o == MsgEnum.WORKING) {
			log.info("i am working.");
		} else if (o == MsgEnum.DONE) {
			log.info("i am done");
		} else if (o == MsgEnum.CLOSE) {
			log.info("i am close." + getContext().self().path());
			getContext().stop(getSelf());// 關閉自己
		} else {
			unhandled(o);
		}
	}

}

InboxMain2
package com.eastcom.first.spark.data.akka.router;

import java.util.concurrent.atomic.AtomicBoolean;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;

public class InboxMain2 {

	public static AtomicBoolean flag = new AtomicBoolean(true);

	public static void main(String[] args) throws InterruptedException {

		// ActorSystem system = ActorSystem.create("routerTest",
		// ConfigFactory.load("akka.config"));
		ActorSystem system = ActorSystem.create("System");
		ActorRef workerRouter = system.actorOf(Props.create(InboxActor2.class).withRouter(new RoundRobinPool(3)),
				"RouterTest");
		int i = 1;
		int workSize = 0;
		while (flag.get()) {
			workerRouter.tell(MsgEnum.WORKING, ActorRef.noSender());
			if (i % 3 == 0) {
				workerRouter.tell(MsgEnum.CLOSE, ActorRef.noSender());
				workSize++;
				if (workSize >= 3) {
					flag.set(false);
				}
			}
			Thread.sleep(1000);
			i++;
		}

		system.terminate();
	}

}

執行結果
[INFO] [08/10/2017 18:10:06.696] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$a] i am working.
[INFO] [08/10/2017 18:10:07.687] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:08.687] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$c] i am working.
[INFO] [08/10/2017 18:10:08.688] [System-akka.actor.default-dispatcher-4] [akka://System/user/RouterTest/$a] i am close.akka://System/user/RouterTest/$a
[INFO] [08/10/2017 18:10:09.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:10.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$c] i am working.
[INFO] [08/10/2017 18:10:11.699] [System-akka.actor.default-dispatcher-2] [akka://System/user/RouterTest/$c] i am close.akka://System/user/RouterTest/$c
[INFO] [08/10/2017 18:10:11.699] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:12.703] [System-akka.actor.default-dispatcher-6] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:13.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:14.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am working.
[INFO] [08/10/2017 18:10:14.702] [System-akka.actor.default-dispatcher-3] [akka://System/user/RouterTest/$b] i am close.akka://System/user/RouterTest/$b