使用Akka構建集群(二)
前言
在《使用Akka構建集群(一)》一文中通過簡單集群監聽器的例子演示了如何使用Akka搭建一個簡單的集群,但是這個例子“也許”離我們的實際業務場景太遠,你基本不太可能去做這樣的工作,除非你負責運維、監控相關的工作(但實際上一個合格的程序員在實現功能的同時,也應當考慮監控的問題,至少應當接入一些監控系統或框架)。
本文將介紹一個相對看來更符合我們對於集群使用的業務需求的例子——將客戶端請求的字符串轉換為大寫(假如客戶端真的沒有這個能力的話)。
服務端
本文的Akka配置繼續沿用《使用Akka構建集群(一)》一文中所展示的配置,但在正式編碼之前我們需要在配置中增加一個新的配置項akka.cluster.roles指定集群中服務端的角色,重新編輯過後的application.conf如下:
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s roles = [backend] # Disable legacy metrics in akka-cluster. metrics.enabled=off } }
你仍然不需要過多產生於集群直接相關的細節。如果你已經閱讀了《使用Akka構建集群(一)》一文,本文介紹的內容應該不會花費你太多的時間。
客戶端與服務端通信需要一些pojo,它們的實現如下:
public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; public TransformationJob(String text) { this.text = text; } public String getText() { return text; } } public static class TransformationResult implements Serializable { private final String text; public TransformationResult(String text) { this.text = text; } public String getText() { return text; } @Override public String toString() { return "TransformationResult(" + text + ")"; } } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; public JobFailed(String reason, TransformationJob job) { this.reason = reason; this.job = job; } public String getReason() { return reason; } public TransformationJob getJob() { return job; } @Override public String toString() { return "JobFailed(" + reason + ")"; } } public static final String BACKEND_REGISTRATION = "BackendRegistration"; }
TransformationJob代表待轉換的任務,其text屬性是需要處理的字符串文本;TransformationResult是任務處理的結果,其text屬性是轉換完成的字符串文本;JobFailed是任務失敗,其reason屬性代表失敗原因;字符串常量BACKEND_REGISTRATION用於服務端向客戶端註冊,以便於客戶端知道有哪些服務端可以提供服務。
服務端用於將字符串轉換為大寫的Actor(正如我之前的文章所言,真正的處理應當從Actor中分離出去,只少通過接口解耦)的實現見代碼清單1所示。
代碼清單1
@Named("TransformationBackend") @Scope("prototype") public class TransformationBackend extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(TransformationBackend.class); Cluster cluster = Cluster.get(getContext().system()); // subscribe to cluster changes, MemberUp @Override public void preStart() { cluster.subscribe(getSelf(), MemberUp.class); } // re-subscribe when restart @Override public void postStop() { cluster.unsubscribe(getSelf()); } @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; logger.info(job.getText()); getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; register(mUp.member()); } else { unhandled(message); } } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/transformationFrontend").tell(BACKEND_REGISTRATION, getSelf()); } }
TransformationBackend在preStart方法中訂閱了集群的MemberUp事件,這樣當它發現新註冊的集群成員節點的角色是frontend(前端)時,將向此節點發送BACKEND_REGISTRATION消息,後者將會知道前者提供了服務。TransformationBackend所在的節點在剛剛加入集群時,TransformationBackend還會收到CurrentClusterState消息,從中可以解析出集群中的所有前端節點(即roles為frontend的),並向其發送BACKEND_REGISTRATION消息。經過以上兩步可以確保集群中的前端節點和後端節點無論啟動或加入集群的順序怎樣變化,都不會影響後端節點通知所有的前端節點及前端節點知道哪些後端節點提供了服務。
客戶端
客戶端除了監聽端口不同外,也需要增加akka.cluster.roles配置項,我們指定為frontend。客戶端的配置如下:
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s roles = [frontend] } }
客戶端用於處理轉換任務的Actor見代碼清單2所說。
代碼清單2
@Named("TransformationFrontend") @Scope("prototype") public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } }
可以看到TransformationFrontend處理的消息分為以下三種:
- BACKEND_REGISTRATION:收到此消息說明有服務端通知客戶端,TransformationFrontend首先將服務端的ActorRef加入backends列表,然後對服務端的ActorRef添加監管;
- Terminated:由於TransformationFrontend對服務端的ActorRef添加了監管,所以當服務端進程奔潰或者重啟時,將收到Terminated消息,此時TransformationFrontend將此服務端的ActorRef從backends列表中移除;
- TransformationJob:此消息說明有新的轉換任務需要TransformationFrontend處理,處理分兩種情況:
- backends列表為空,則向發送此任務的發送者返回JobFailed消息,並告知“目前沒有服務端可用,請稍後再試”;
- backends列表不為空,則通過取模運算選出一個服務端,將TransformationJob轉發給服務端進一步處理;
運行展示
初始化服務端TransformationBackend的代碼如下:
logger.info("Start transformationBackend"); final ActorRef transformationBackend = actorSystem.actorOf(springExt.props("TransformationBackend"), "transformationBackend"); actorMap.put("transformationBackend", transformationBackend); logger.info("Started transformationBackend");
初始化客戶端TransformationFrontend的代碼如下:
logger.info("Start transformationFrontend"); final ActorRef transformationFrontend = actorSystem .actorOf(springExt.props("TransformationFrontend"), "transformationFrontend"); actorMap.put("transformationFrontend", transformationFrontend); logger.info("Started transformationFrontend"); final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); final ExecutionContext ec = actorSystem.dispatcher(); final AtomicInteger counter = new AtomicInteger(); actorSystem.scheduler().schedule(interval, interval, new Runnable() { public void run() { ask(transformationFrontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout) .onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { logger.info(result.toString()); } }, ec); } }, ec);
可以看到我們在客戶端每2秒將發送一個新的消息,這個消息以“hello-”開頭,後邊是一個不斷自增的數字。當收到處理結果後,客戶端還會將結果打印出來。
我們以3個服務端節點(host相同,端口分別為2551、2552及隨機)、1個客戶端節點(端口隨機)組成的集群為例,我們首先啟動第一個種子節點,然後以任意順序啟動其它服務端或者客戶端節點(啟動順序問題在《使用Akka構建集群(一)》一文中已介紹,此處不再贅述),集群成員變化的日誌如下圖:
從上面展示的日誌中可以看到集群的3個服務端節點和1個客戶端節點先後加入集群的信息。
我們再來看看端口為57222的角色為frontend的節點的日誌信息,如下圖:
從frontend的日誌看出,它已經打印了大寫得HELLO-3到HELLO-10十條任務處理結果。那麽這些任務分別是由集群中的哪些節點負責處理的?我們首先來看看端口為2551的backend節點,其處理任務的日誌如下圖:
看來2551節點處理了hello-4、hello-7及hello-10三條任務。我們再來看看端口為2552的backend節點,其處理任務的日誌如下圖:
可以看到2552節點處理了hello-2、hello-5及hello-8三條任務。最後看看端口為57211的backend節點,其處理任務的日誌如下圖:
可以看到從hello-3到hello-10這8條處理任務被均衡的分配給了3個不同的後端節點處理。奇怪的是hello-1這條消息居然沒有任何顯示,那是因為前端節點剛開始處理消息時,backends列表裏還沒有緩存好任何backend的ActorRef。我們向上查找frontend節點的日誌,在相隔很遠的日誌中發現了下面的輸出:
這也印證了我們的猜測。
總結
根據本文的例子,大家應當看到使用Akka構建集群,開發人員只需要關註消息的發送與接收,而無需過多涉及集群的細節。無論前端還是後端節點都可以加入同一個集群,而且多個後端節點處理消息也能達到負載均衡的功效。
其它Akka應用的博文如下:
- 《Spring與Akka的集成》;
- 《使用Akka的遠程調用》;
- 《使用Akka構建集群(一)》;
- 《使用Akka構建集群(二)》;
- 《使用Akka持久化——持久化與快照》;
- 《使用Akka持久化——消息發送與接收》;
使用Akka構建集群(二)