使用Akka構建叢集(二)
前言
在《使用Akka構建叢集(一)》一文中通過簡單叢集監聽器的例子演示瞭如何使用Akka搭建一個簡單的叢集,但是這個例子“也許”離我們的實際業務場景太遠,你基本不太可能去做這樣的工作,除非你負責運維、監控相關的工作(但實際上一個合格的程式設計師在實現功能的同時,也應當考慮監控的問題,至少應當接入一些監控系統或框架)。
本文將介紹一個相對看來更符合我們對於叢集使用的業務需求的例子——將客戶端請求的字串轉換為大寫(假如客戶端真的沒有這個能力的話)。
服務端
本文的Akka配置繼續沿用《使用Akka構建叢集(一)》一文中所展示的配置,但在正式編碼之前我們需要在配置中增加一個新的配置項akka.cluster.roles指定叢集中服務端的角色,重新編輯過後的application.conf如下:
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 2551 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s roles = [backend] # Disable legacy metrics in akka-cluster. metrics.enabled=off } }
你仍然不需要過多產生於叢集直接相關的細節。如果你已經閱讀了《使用Akka構建叢集(一)》一文,本文介紹的內容應該不會花費你太多的時間。
客戶端與服務端通訊需要一些pojo,它們的實現如下:
public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; public TransformationJob(String text) { this.text = text; } public String getText() { return text; } } public static class TransformationResult implements Serializable { private final String text; public TransformationResult(String text) { this.text = text; } public String getText() { return text; } @Override public String toString() { return "TransformationResult(" + text + ")"; } } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; public JobFailed(String reason, TransformationJob job) { this.reason = reason; this.job = job; } public String getReason() { return reason; } public TransformationJob getJob() { return job; } @Override public String toString() { return "JobFailed(" + reason + ")"; } } public static final String BACKEND_REGISTRATION = "BackendRegistration"; }
TransformationJob代表待轉換的任務,其text屬性是需要處理的字串文字;TransformationResult是任務處理的結果,其text屬性是轉換完成的字串文字;JobFailed是任務失敗,其reason屬性代表失敗原因;字串常量BACKEND_REGISTRATION用於服務端向客戶端註冊,以便於客戶端知道有哪些服務端可以提供服務。
服務端用於將字串轉換為大寫的Actor(正如我之前的文章所言,真正的處理應當從Actor中分離出去,只少通過介面解耦)的實現見程式碼清單1所示。
程式碼清單1
@Named("TransformationBackend") @Scope("prototype") public class TransformationBackend extends UntypedActor { private static Logger logger = LoggerFactory.getLogger(TransformationBackend.class); Cluster cluster = Cluster.get(getContext().system()); // subscribe to cluster changes, MemberUp @Override public void preStart() { cluster.subscribe(getSelf(), MemberUp.class); } // re-subscribe when restart @Override public void postStop() { cluster.unsubscribe(getSelf()); } @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; logger.info(job.getText()); getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; register(mUp.member()); } else { unhandled(message); } } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/transformationFrontend").tell(BACKEND_REGISTRATION, getSelf()); } }
TransformationBackend在preStart方法中訂閱了叢集的MemberUp事件,這樣當它發現新註冊的叢集成員節點的角色是frontend(前端)時,將向此節點發送BACKEND_REGISTRATION訊息,後者將會知道前者提供了服務。TransformationBackend所在的節點在剛剛加入叢集時,TransformationBackend還會收到CurrentClusterState訊息,從中可以解析出叢集中的所有前端節點(即roles為frontend的),並向其傳送BACKEND_REGISTRATION訊息。經過以上兩步可以確保叢集中的前端節點和後端節點無論啟動或加入叢集的順序怎樣變化,都不會影響後端節點通知所有的前端節點及前端節點知道哪些後端節點提供了服務。
客戶端
客戶端除了監聽埠不同外,也需要增加akka.cluster.roles配置項,我們指定為frontend。客戶端的配置如下:
akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster { seed-nodes = [ "akka.tcp://[email protected]:2551", "akka.tcp://[email protected]:2552"] #//#snippet # excluded from snippet auto-down-unreachable-after = 10s #//#snippet # auto downing is NOT safe for production deployments. # you may want to use it during development, read more about it in the docs. # # auto-down-unreachable-after = 10s roles = [frontend] } }
客戶端用於處理轉換任務的Actor見程式碼清單2所說。
程式碼清單2
@Named("TransformationFrontend") @Scope("prototype") public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } }
可以看到TransformationFrontend處理的訊息分為以下三種:
- BACKEND_REGISTRATION:收到此訊息說明有服務端通知客戶端,TransformationFrontend首先將服務端的ActorRef加入backends列表,然後對服務端的ActorRef新增監管;
- Terminated:由於TransformationFrontend對服務端的ActorRef添加了監管,所以當服務端程序奔潰或者重啟時,將收到Terminated訊息,此時TransformationFrontend將此服務端的ActorRef從backends列表中移除;
- TransformationJob:此訊息說明有新的轉換任務需要TransformationFrontend處理,處理分兩種情況:
- backends列表為空,則向傳送此任務的傳送者返回JobFailed訊息,並告知“目前沒有服務端可用,請稍後再試”;
- backends列表不為空,則通過取模運算選出一個服務端,將TransformationJob轉發給服務端進一步處理;
執行展示
初始化服務端TransformationBackend的程式碼如下:
logger.info("Start transformationBackend"); final ActorRef transformationBackend = actorSystem.actorOf(springExt.props("TransformationBackend"), "transformationBackend"); actorMap.put("transformationBackend", transformationBackend); logger.info("Started transformationBackend");
初始化客戶端TransformationFrontend的程式碼如下:
logger.info("Start transformationFrontend"); final ActorRef transformationFrontend = actorSystem .actorOf(springExt.props("TransformationFrontend"), "transformationFrontend"); actorMap.put("transformationFrontend", transformationFrontend); logger.info("Started transformationFrontend"); final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS); final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); final ExecutionContext ec = actorSystem.dispatcher(); final AtomicInteger counter = new AtomicInteger(); actorSystem.scheduler().schedule(interval, interval, new Runnable() { public void run() { ask(transformationFrontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout) .onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { logger.info(result.toString()); } }, ec); } }, ec);
可以看到我們在客戶端每2秒將傳送一個新的訊息,這個訊息以“hello-”開頭,後邊是一個不斷自增的數字。當收到處理結果後,客戶端還會將結果打印出來。
我們以3個服務端節點(host相同,埠分別為2551、2552及隨機)、1個客戶端節點(埠隨機)組成的叢集為例,我們首先啟動第一個種子節點,然後以任意順序啟動其它服務端或者客戶端節點(啟動順序問題在《使用Akka構建叢集(一)》一文中已介紹,此處不再贅述),叢集成員變化的日誌如下圖:
從上面展示的日誌中可以看到叢集的3個服務端節點和1個客戶端節點先後加入叢集的資訊。
我們再來看看埠為57222的角色為frontend的節點的日誌資訊,如下圖:
從frontend的日誌看出,它已經列印了大寫得HELLO-3到HELLO-10十條任務處理結果。那麼這些任務分別是由叢集中的哪些節點負責處理的?我們首先來看看埠為2551的backend節點,其處理任務的日誌如下圖:
看來2551節點處理了hello-4、hello-7及hello-10三條任務。我們再來看看埠為2552的backend節點,其處理任務的日誌如下圖:
可以看到2552節點處理了hello-2、hello-5及hello-8三條任務。最後看看埠為57211的backend節點,其處理任務的日誌如下圖:
可以看到從hello-3到hello-10這8條處理任務被均衡的分配給了3個不同的後端節點處理。奇怪的是hello-1這條訊息居然沒有任何顯示,那是因為前端節點剛開始處理訊息時,backends列表裡還沒有快取好任何backend的ActorRef。我們向上查詢frontend節點的日誌,在相隔很遠的日誌中發現了下面的輸出:
這也印證了我們的猜測。
總結
根據本文的例子,大家應當看到使用Akka構建叢集,開發人員只需要關注訊息的傳送與接收,而無需過多涉及叢集的細節。無論前端還是後端節點都可以加入同一個叢集,而且多個後端節點處理訊息也能達到負載均衡的功效。
其它Akka應用的博文如下:
- 《Spring與Akka的整合》;
- 《使用Akka的遠端呼叫》;
- 《使用Akka構建叢集(一)》;
- 《使用Akka構建叢集(二)》;
- 《使用Akka持久化——持久化與快照》;
- 《使用Akka持久化——訊息傳送與接收》;