JStorm之Topology調度
阿新 • • 發佈:2018-02-10
監聽事件 當前 min ng- spa 標準 bus getc courier
? topology在服務端提交過程中,會經過一系列的驗證和初始化:TP結構校驗、創建本地文件夾並拷貝序列化文件jar包、生成znode用於存放TP和task等信息,最後一步才進行任務分配。例如以下圖:
提交主函數位於ServiceHandler.java中
這當中最基本的是事件丟入隊列後興許的處理過程。事件分配由TopologyAssign線程處理,這個線程的流程非常清晰,監聽事件隊列。一旦有事件進入,立即取出,進行doTopologyAssignment,例如以下:
任務分配的核心代碼位於TopologyAssign.java中
調用棧例如以下:
分配原理是首先獲得全部可用的supervisor,推斷supervisor可用的標準是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然後計算出須要分配幾個woker。由於一個woker相應一個port,當然這些信息的採集都是來自Zookeeper,如今我們來分析分配的核心代碼:
WorkerMaker.java
//註意參數,result是這個作業須要的槽位。傳入前僅僅知道須要槽位的數量,詳細分配到哪臺supervisor上還沒指定
//supervisors指當前集群中全部可用的supervisor。即有空暇port的
從上面的代碼中我們能夠看到,眼下槽位分配沒考慮機器負載,槽位的分配並不一定平均,比方第一個supervisor有10個槽位,剩下的supervisor僅僅有兩個,那麽還是要每一個supervisor分配一個woker的。
提交主函數位於ServiceHandler.java中
private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException { //1、創建topology的分配事件 TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyName); assignEvent.setOldStatus(Thrift .topologyInitialStatusToStormStatus(status)); //2、丟入事件處理隊列 TopologyAssign.push(assignEvent); //3、等待時間返回 boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyName); } else { throw new FailedAssignTopologyException( assignEvent.getErrorMsg()); } }
這當中最基本的是事件丟入隊列後興許的處理過程。事件分配由TopologyAssign線程處理,這個線程的流程非常清晰,監聽事件隊列。一旦有事件進入,立即取出,進行doTopologyAssignment,例如以下:
public void run() { LOG.info("TopologyAssign thread has been started"); runFlag = true; while (runFlag) { TopologyAssignEvent event; try { event = queue.take(); } catch (InterruptedException e1) { continue; } if (event == null) { continue; } boolean isSuccess = doTopologyAssignment(event); .............. }
任務分配的核心代碼位於TopologyAssign.java中
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); TopologyAssignContext context = prepareTopologyAssign(event); Set<ResourceWorkerSlot> assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers .get(DEFAULT_SCHEDULER_NAME); //開始進行作業的調度 assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); } ............ }
調用棧例如以下:
分配原理是首先獲得全部可用的supervisor,推斷supervisor可用的標準是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然後計算出須要分配幾個woker。由於一個woker相應一個port,當然這些信息的採集都是來自Zookeeper,如今我們來分析分配的核心代碼:
WorkerMaker.java
//註意參數,result是這個作業須要的槽位。傳入前僅僅知道須要槽位的數量,詳細分配到哪臺supervisor上還沒指定
//supervisors指當前集群中全部可用的supervisor。即有空暇port的
private void putWorkerToSupervisor(List<ResourceWorkerSlot> result, List<SupervisorInfo> supervisors) { int key = 0; //按所需槽位遍歷,每次分配一個 for (ResourceWorkerSlot worker : result) { //首先進行必要的推斷和置位 if (supervisors.size() == 0) return; if (worker.getNodeId() != null) continue; if (key >= supervisors.size()) key = 0; //1、取出第一個supervisor SupervisorInfo supervisor = supervisors.get(key); worker.setHostname(supervisor.getHostName()); worker.setNodeId(supervisor.getSupervisorId()); worker.setPort(supervisor.getWorkerPorts().iterator().next()); //槽位用完則從集合中刪除,不再參與分配 supervisor.getWorkerPorts().remove(worker.getPort()); if (supervisor.getWorkerPorts().size() == 0) supervisors.remove(supervisor); //當一個supervisor分配完後便不再使用。除非supervisor不夠用 key++; } }
從上面的代碼中我們能夠看到,眼下槽位分配沒考慮機器負載,槽位的分配並不一定平均,比方第一個supervisor有10個槽位,剩下的supervisor僅僅有兩個,那麽還是要每一個supervisor分配一個woker的。
註意一個問題,在上面代碼中supervisors這個集合是經過排序的,排序規則例如以下:
private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result, List<SupervisorInfo> supervisors) { ........... supervisors = this.getCanUseSupervisors(supervisors); Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { // TODO Auto-generated method stub return -NumberUtils.compare(o1.getWorkerPorts().size(), o2 .getWorkerPorts().size()); } }); this.putWorkerToSupervisor(result, supervisors); ............. }能夠看到。當前排序規則是按slot多少的,我們興許版本號中可能會考慮機器負載的一些因素吧。
JStorm之Topology調度