1. 程式人生 > >JStorm之Topology調度

JStorm之Topology調度

監聽事件 當前 min ng- spa 標準 bus getc courier

? topology在服務端提交過程中,會經過一系列的驗證和初始化:TP結構校驗、創建本地文件夾並拷貝序列化文件jar包、生成znode用於存放TP和task等信息,最後一步才進行任務分配。例如以下圖:
技術分享圖片
提交主函數位於ServiceHandler.java中
private void makeAssignment(String topologyName, String topologyId, 
		TopologyInitialStatus status) throws FailedAssignTopologyException {
	//1、創建topology的分配事件
	TopologyAssignEvent assignEvent = new TopologyAssignEvent();
	assignEvent.setTopologyId(topologyId);
	assignEvent.setScratch(false);
	assignEvent.setTopologyName(topologyName);
	assignEvent.setOldStatus(Thrift
			.topologyInitialStatusToStormStatus(status));
  //2、丟入事件處理隊列
	TopologyAssign.push(assignEvent);
  //3、等待時間返回
	boolean isSuccess = assignEvent.waitFinish();
	if (isSuccess == true) {
		LOG.info("Finish submit for " + topologyName);
	} else {
		throw new FailedAssignTopologyException(
				assignEvent.getErrorMsg());
	}
}

這當中最基本的是事件丟入隊列後興許的處理過程。事件分配由TopologyAssign線程處理,這個線程的流程非常清晰,監聽事件隊列。一旦有事件進入,立即取出,進行doTopologyAssignment,例如以下:
public void run() {
	LOG.info("TopologyAssign thread has been started");
	runFlag = true;


	while (runFlag) {
		TopologyAssignEvent event;
		try {
			event = queue.take();
		} catch (InterruptedException e1) {
			continue;
		}
		if (event == null) {
			continue;
		}


		boolean isSuccess = doTopologyAssignment(event);


		..............
}

任務分配的核心代碼位於TopologyAssign.java中
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
	String topologyId = event.getTopologyId();


	LOG.info("Determining assignment for " + topologyId);


	TopologyAssignContext context = prepareTopologyAssign(event);


	Set<ResourceWorkerSlot> assignments = null;


	if (!StormConfig.local_mode(nimbusData.getConf())) {


		IToplogyScheduler scheduler = schedulers
				.get(DEFAULT_SCHEDULER_NAME);
		//開始進行作業的調度
		assignments = scheduler.assignTasks(context);


	} else {
		assignments = mkLocalAssignment(context);
	}
	............
}

調用棧例如以下:
技術分享圖片
分配原理是首先獲得全部可用的supervisor,推斷supervisor可用的標準是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然後計算出須要分配幾個woker。由於一個woker相應一個port,當然這些信息的採集都是來自Zookeeper,如今我們來分析分配的核心代碼:
WorkerMaker.java
//註意參數,result是這個作業須要的槽位。傳入前僅僅知道須要槽位的數量,詳細分配到哪臺supervisor上還沒指定
//supervisors指當前集群中全部可用的supervisor。即有空暇port的
private void putWorkerToSupervisor(List<ResourceWorkerSlot> result,
		List<SupervisorInfo> supervisors) {
	int key = 0;
	//按所需槽位遍歷,每次分配一個
	for (ResourceWorkerSlot worker : result) {
		//首先進行必要的推斷和置位
		if (supervisors.size() == 0)
			return;
		if (worker.getNodeId() != null)
			continue;
		if (key >= supervisors.size())
			key = 0;
		//1、取出第一個supervisor
		SupervisorInfo supervisor = supervisors.get(key);
		worker.setHostname(supervisor.getHostName());
		worker.setNodeId(supervisor.getSupervisorId());
		worker.setPort(supervisor.getWorkerPorts().iterator().next());
		//槽位用完則從集合中刪除,不再參與分配
		supervisor.getWorkerPorts().remove(worker.getPort());
		if (supervisor.getWorkerPorts().size() == 0)
			supervisors.remove(supervisor);
		//當一個supervisor分配完後便不再使用。除非supervisor不夠用
		key++;
	}
}

從上面的代碼中我們能夠看到,眼下槽位分配沒考慮機器負載,槽位的分配並不一定平均,比方第一個supervisor有10個槽位,剩下的supervisor僅僅有兩個,那麽還是要每一個supervisor分配一個woker的。

註意一個問題,在上面代碼中supervisors這個集合是經過排序的,排序規則例如以下:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result,
		List<SupervisorInfo> supervisors) {
	...........
	supervisors = this.getCanUseSupervisors(supervisors);
	Collections.sort(supervisors, new Comparator<SupervisorInfo>() {


		@Override
		public int compare(SupervisorInfo o1, SupervisorInfo o2) {
			// TODO Auto-generated method stub
			return -NumberUtils.compare(o1.getWorkerPorts().size(), o2
					.getWorkerPorts().size());
		}


	});
	this.putWorkerToSupervisor(result, supervisors);
	.............
}
能夠看到。當前排序規則是按slot多少的,我們興許版本號中可能會考慮機器負載的一些因素吧。

JStorm之Topology調度