1. 程式人生 > >JStorm原始碼閱讀-建立topo的流程

JStorm原始碼閱讀-建立topo的流程

  • Nimbus部分
//在單機模式下,首先要準備環境
    public static LocalClusterMap prepareLocalCluster() {
        LocalClusterMap state = new LocalClusterMap();
        try {
            List<String> tmpDirs = new ArrayList();

            String zkDir = getTmpDir();
            tmpDirs.add(zkDir);
            Factory zookeeper =
startLocalZookeeper(zkDir);//本地zk啟動! Map conf = getLocalConf(zookeeper.getZooKeeperServer().getClientPort()); String nimbusDir = getTmpDir(); tmpDirs.add(nimbusDir); Map nimbusConf = deepCopyMap(conf); nimbusConf.put(Config.STORM_LOCAL_DIR,
nimbusDir); NimbusServer instance = new NimbusServer();//本地Nimbus啟動! Map supervisorConf = deepCopyMap(conf); String supervisorDir = getTmpDir(); tmpDirs.add(supervisorDir); supervisorConf.put(Config.STORM_LOCAL_DIR, supervisorDir); Supervisor supervisor =
new Supervisor();//本地Supervisor啟動! IContext context = getLocalContext(supervisorConf); state.setNimbusServer(instance); state.setNimbus(instance.launcherLocalServer(nimbusConf, new DefaultInimbus())); state.setZookeeper(zookeeper); state.setConf(conf); state.setTmpDir(tmpDirs); state.setSupervisor(supervisor.mkSupervisor(supervisorConf, context)); return state; } catch (Exception e) { LOG.error("prepare cluster error!", e); state.clean(); } return null; } //然後是向叢集提交具體的topo public String submitTopologyWithOpts(String topologyName, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options) throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException { LOG.info("Receive " + topologyName + ", uploadedJarLocation:" + uploadedJarLocation); long start = System.nanoTime(); //check topologyname is valid if (!Common.charValidate(topologyName)) { throw new InvalidTopologyException(topologyName + " is not a valid topology name"); } Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils.from_json(jsonConf); if (serializedConf == null) { LOG.warn("Failed to serialized Configuration"); throw new InvalidTopologyException("Failed to serialize topology configuration"); } Common.confValidate(serializedConf, data.getConf()); boolean enableDeploy = ConfigExtension.getTopologyHotDeplogyEnable(serializedConf); try { checkTopologyActive(data, topologyName, enableDeploy); } catch (AlreadyAliveException e) { LOG.info(topologyName + " already exists "); throw e; } catch (NotAliveException e){ LOG.info(topologyName + " is not alive "); throw e; }catch (Throwable e) { LOG.info("Failed to check whether topology is alive or not", e); throw new TException(e); } if (enableDeploy){ LOG.info("deploy the topology"); try { StormClusterState stormClusterState = data.getStormClusterState(); String topologyId = Cluster.get_topology_id(stormClusterState, topologyName); if (topologyId == null) { throw new NotAliveException(topologyName); } LOG.info("start kill the old topology {}", topologyId); Map oldConf = new HashMap(); oldConf.putAll(conf); Map killedStormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore()); if (killedStormConf != null) { oldConf.putAll(killedStormConf); } NimbusUtils.transitionName(data, topologyName, true, StatusType.kill, 0); KillTopologyEvent.pushEvent(topologyId); notifyTopologyActionListener(topologyName, "killTopology"); //wait all workers' are killed final long timeoutSeconds = ConfigExtension.getTaskCleanupTimeoutSec(oldConf); ConcurrentHashMap<String, Semaphore> topologyIdtoSem = data.getTopologyIdtoSem(); if (!topologyIdtoSem.contains(topologyId)){ topologyIdtoSem.putIfAbsent(topologyId, new Semaphore(0)); } Semaphore semaphore = topologyIdtoSem.get(topologyId); if (semaphore != null){ semaphore.tryAcquire(timeoutSeconds, TimeUnit.SECONDS); topologyIdtoSem.remove(semaphore); } LOG.info("success kill the old topology {}", topologyId); } catch (Exception e) { } } String topologyId = null; synchronized (data) { // avoid same topologies from being submitted at the same time Set<String> pendingTopologies = data.getPendingSubmitTopologies().buildMap().keySet(); for (String cachTopologyId : pendingTopologies) { if (cachTopologyId.contains(topologyName + "-")) throw new AlreadyAliveException( topologyName + " were submitted"); } int counter = data.getSubmittedCount().incrementAndGet(); topologyId = Common.topologyNameToId(topologyName, counter); data.getPendingSubmitTopologies().put(topologyId, null); } try { serializedConf.put(Config.TOPOLOGY_ID, topologyId); serializedConf.put(Config.TOPOLOGY_NAME, topologyName); Map<Object, Object> stormConf; stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); LOG.info("Normalized configuration:" + stormConf); Map<Object, Object> totalStormConf = new HashMap<Object, Object>(conf); totalStormConf.putAll(stormConf); StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true); // this validates the structure of the topology Common.validate_basic(normalizedTopology, totalStormConf, topologyId); // don't need generate real topology, so skip Common.system_topology // Common.system_topology(totalStormConf, topology); StormClusterState stormClusterState = data.getStormClusterState(); // create /local-dir/nimbus/topologyId/xxxx files setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology); // wait for blob replication before activate topology waitForDesiredCodeReplication(conf, topologyId); // generate TaskInfo for every bolt or spout in ZK // /ZK/tasks/topoologyId/xxx setupZkTaskInfo(conf, topologyId, stormClusterState); //mkdir topology error directory String path = Cluster.taskerror_storm_root(topologyId); stormClusterState.mkdir(path); // make assignments for a topology LOG.info("Submit for " + topologyName + " with conf " + serializedConf); makeAssignment(topologyName, topologyId, options.get_initial_status()); // push start event after startup double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); StartTopologyEvent.pushEvent(topologyId, metricsSampleRate); notifyTopologyActionListener(topologyName, "submitTopology"); } catch (Throwable e) { } finally { // when make assignment for a topology,so remove the topologyid form // pendingSubmitTopologys data.getPendingSubmitTopologies().remove(topologyId); } return topologyId; } //ServiceHandler.java private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException { TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyName); assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status)); //將事件放入TopologyAssign內部的佇列中,交給執行緒(TopologyAssign)執行。 TopologyAssign.push(assignEvent); boolean isSuccess = assignEvent.waitFinish(); if (isSuccess == true) { LOG.info("Finish submit for " + topologyName); } else { throw new FailedAssignTopologyException(assignEvent.getErrorMsg()); } } //TopologyAssign.java public void run() { runFlag = true; while (runFlag) { TopologyAssignEvent event; try { event = queue.take(); } catch (InterruptedException e1) { continue; } //在其中呼叫了zkobj.setData(zk, path, data)最終向zookeeper提交了任務 boolean isSuccess = doTopologyAssignment(event); if (isSuccess == false) { } else { } } }

除錯JStorm首先檢查TopologyAssign執行緒是否從佇列中取到數了。