1. 程式人生 > >聊聊storm worker的executor與task

聊聊storm worker的executor與task

本文主要研究一下storm worker的executor與task

Worker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
        String stormId = args[0];
        String assignmentId = args[1];
        String supervisorPort = args[2];
        String portStr = args[3];
        String workerId = args[4];
        Map<String, Object> conf = ConfigUtils.readStormConfig();
        Utils.setupDefaultUncaughtExceptionHandler();
        StormCommon.validateDistributedMode(conf);
        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
                                   Integer.parseInt(portStr), workerId);
        worker.start();
        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
    }
複製程式碼
  • main方法建立Worker,然後呼叫start

Worker.start

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

    public void start() throws Exception {
        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
                 ConfigUtils.maskPasswords(conf));
        // because in
local mode, its not a separate // process. supervisor will register it in this case // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. if (!ConfigUtils.isLocalMode(conf)) { // Distributed mode SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); String pid = Utils.processPid(); FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid))); FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid, Charset.forName("UTF-8"
)); } final Map<String, Object> topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId)); ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf); IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext); IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext); StormMetricRegistry.start(conf, DaemonType.WORKER); Credentials initialCredentials = stormClusterState.credentials(topologyId, null); Map<String, String> initCreds = new HashMap<>(); if (initialCredentials != null) { initCreds.putAll(initialCredentials.get_creds()); } autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf); subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds); Subject.doAs(subject, (PrivilegedExceptionAction<Object>) () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials) ); } 複製程式碼
  • 這裡主要是呼叫loadWorker

Worker.loadWorker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

	private AtomicReference<List<IRunningExecutor>> executorsAtom;

    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
                              Map<String, String> initCreds, Credentials initialCredentials)
        throws Exception {
        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
                                      topologyConf, stateStorage, stormClusterState, autoCreds);

        // Heartbeat here so that worker process dies if this fails
        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
        // that worker is running and moves on
        doHeartBeat();

        executorsAtom = new AtomicReference<>(null);

        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        // to the supervisor
        workerState.heartbeatTimer
            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
                try {
                    doHeartBeat();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

        workerState.executorHeartbeatTimer
            .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
                               Worker.this::doExecutorHeartbeats);

        workerState.registerCallbacks();

        workerState.refreshConnections(null);

        workerState.activateWorkerWhenAllConnectionsReady();

        workerState.refreshStormActive(null);

        workerState.runWorkerStartHooks();

        List<Executor> execs = new ArrayList<>();
        for (List<Long> e : workerState.getLocalExecutors()) {
            if (ConfigUtils.isLocalMode(topologyConf)) {
                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                execs.add(executor);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
            } else {
                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
                execs.add(executor);
            }
        }

        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
        for (Executor executor : execs) {
            newExecutors.add(executor.execute());
        }
        executorsAtom.set(newExecutors);

        //......

        setupFlushTupleTimer(topologyConf, newExecutors);
        setupBackPressureCheckTimer(topologyConf);

        LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));
        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
        return this;
    }
複製程式碼
  • 這裡通過workerState.getLocalExecutors()獲取List executorId的集合
  • 然後通過Executor.mkExecutor建立指定數量的Executor,然後呼叫execute()方法轉換為ExecutorShutdown,然後儲存到AtomicReference<List> executorsAtom

WorkerState.getLocalExecutors

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java

    // local executors and localTaskIds running in this worker
    final Set<List<Long>> localExecutors;

    public Set<List<Long>> getLocalExecutors() {
        return localExecutors;
    }

    public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
                       int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
                       IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException,
        InvalidTopologyException {
        this.autoCredentials = autoCredentials;
        this.conf = conf;
        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
        //......
    }

    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
                                                 int port) {
        LOG.info("Reading assignments");
        List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
        executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
            NodeInfo nodeInfo = entry.getValue();
            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
                executorsAssignedToThisWorker.add(entry.getKey());
            }
        }
        return executorsAssignedToThisWorker;
    }

    private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
        if (!ConfigUtils.isLocalMode(conf)) {
            try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
                                                                                          supervisorPort)) {
                Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
                return assignment;
            } catch (Throwable tr1) {
                //if any error/exception thrown, fetch it from zookeeper
                return stormClusterState.remoteAssignmentInfo(topologyId, null);
            }
        } else {
            return stormClusterState.remoteAssignmentInfo(topologyId, null);
        }
    }
複製程式碼
  • WorkerState在構造器裡頭通過readWorkerExecutors獲取在本worker執行的executorIds
  • 通過getLocalAssignment方法獲取Assignment,然後通過get_executor_node_port方法獲取Map<List, NodeInfo> executorToNodePort
  • getLocalAssignment通過supervisorClient.getClient().getLocalAssignmentForStorm(topologyId)獲取Assignment,如果出現異常則通過stormClusterState.remoteAssignmentInfo從zookeeper獲取

StormClusterStateImpl.remoteAssignmentInfo

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

    public Assignment remoteAssignmentInfo(String stormId, Runnable callback) {
        if (callback != null) {
            assignmentInfoCallback.put(stormId, callback);
        }
        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
    }
複製程式碼
  • 根據topologyId從ClusterUtils.assignmentPath獲取路徑,然後去zookeeper獲取資料
  • 資料採用thrift序列化,取回來需要反序列化

ClusterUtils.assignmentPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

    public static final String ZK_SEPERATOR = "/";

    public static final String ASSIGNMENTS_ROOT = "assignments";

    public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;

    public static String assignmentPath(String id) {
        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
    }
複製程式碼
  • 路徑為/assignments/{topology},比如/assignments/DemoTopology-1-1539163962

Executor.mkExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
        Executor executor;

        WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
        List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
        String componentId = workerTopologyContext.getComponentId(taskIds.get(0));

        String type = getExecutorType(workerTopologyContext, componentId);
        if (ClientStatsUtil.SPOUT.equals(type)) {
            executor = new SpoutExecutor(workerState, executorId, credentials);
        } else {
            executor = new BoltExecutor(workerState, executorId, credentials);
        }

        int minId = Integer.MAX_VALUE;
        Map<Integer, Task> idToTask = new HashMap<>();
        for (Integer taskId : taskIds) {
            minId = Math.min(minId, taskId);
            try {
                Task task = new Task(executor, taskId);
                idToTask.put(taskId, task);
            } catch (IOException ex) {
                throw Utils.wrapInRuntime(ex);
            }
        }

        executor.idToTaskBase = minId;
        executor.idToTask = Utils.convertToArray(idToTask, minId);
        return executor;
    }
複製程式碼
  • 根據元件型別建立SpoutExecutor或者BoltExecutor
  • 然後建立tasks並繫結到executor

Executor.execute

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    /**
     * separated from mkExecutor in order to replace executor transfer in executor data for testing.
     */
    public ExecutorShutdown execute() throws Exception {
        LOG.info("Loading executor tasks " + componentId + ":" + executorId);

        String handlerName = componentId + "-executor" + executorId;
        Utils.SmartThread handler =
            Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);

        LOG.info("Finished loading executor " + componentId + ":" + executorId);
        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask, receiveQueue);
    }
複製程式碼
  • 這裡使用Utils.asyncLoop建立Utils.SmartThread並且呼叫start啟動

Utils.asyncLoop

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java

    /**
     * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous
     * call.
     *
     * The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable
     * that in turn returns the number of seconds to sleep. In the latter case isFactory.
     *
     * @param afn              the code to call on each iteration
     * @param isDaemon         whether the new thread should be a daemon thread
     * @param eh               code to call when afn throws an exception
     * @param priority         the new thread's priority
     * @param isFactory        whether afn returns a callable instead of sleep seconds
     * @param startImmediately whether to start the thread before returning
     * @param threadName       a suffix to be appended to the thread name
     * @return the newly created thread
     *
     * @see Thread
     */
    public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
                                        int priority, final boolean isFactory, boolean startImmediately,
                                        String threadName) {
        SmartThread thread = new SmartThread(new Runnable() {
            public void run() {
                try {
                    final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
                    while (true) {
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                        final Long s = fn.call();
                        if (s == null) { // then stop running it
                            break;
                        }
                        if (s > 0) {
                            Time.sleep(s);
                        }
                    }
                } catch (Throwable t) {
                    if (Utils.exceptionCauseIsInstanceOf(
                        InterruptedException.class, t)) {
                        LOG.info("Async loop interrupted!");
                        return;
                    }
                    LOG.error("Async loop died!", t);
                    throw new RuntimeException(t);
                }
            }
        });
        if (eh != null) {
            thread.setUncaughtExceptionHandler(eh);
        } else {
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                public void uncaughtException(Thread t, Throwable e) {
                    LOG.error("Async loop died!", e);
                    Utils.exitProcess(1, "Async loop died!");
                }
            });
        }
        thread.setDaemon(isDaemon);
        thread.setPriority(priority);
        if (threadName != null && !threadName.isEmpty()) {
            thread.setName(thread.getName() + "-" + threadName);
        }
        if (startImmediately) {
            thread.start();
        }
        return thread;
    }
複製程式碼
  • 這裡run方法無限迴圈呼叫fn.call(),也就是呼叫Executor.call().call()方法
  • BoltExecutor.call主要是呼叫receiveQueue.consume方法
  • SpoutExecutor.call除了呼叫receiveQueue.consume方法,還呼叫了spouts.get(j).nextTuple()

receiveQueue.consume

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer) {
        return consume(consumer, continueRunning);
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
     * elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
        try {
            return consumeImpl(consumer, exitCond);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     *
     * @param consumer
     * @param exitCond
     */
    private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
        int drainCount = 0;
        while (exitCond.keepRunning()) {
            Object tuple = recvQueue.poll();
            if (tuple == null) {
                break;
            }
            consumer.accept(tuple);
            ++drainCount;
        }

        int overflowDrainCount = 0;
        int limit = overflowQ.size();
        while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
            Object tuple = overflowQ.poll();
            ++overflowDrainCount;
            consumer.accept(tuple);
        }
        int total = drainCount + overflowDrainCount;
        if (total > 0) {
            consumer.flush();
        }
        return total;
    }
複製程式碼
  • consume方法主要是呼叫consumer的accept方法

Task

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

public class Task {

    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private final TaskMetrics taskMetrics;
    private Executor executor;
    private WorkerState workerData;
    private TopologyContext systemTopologyContext;
    private TopologyContext userTopologyContext;
    private WorkerTopologyContext workerTopologyContext;
    private Integer taskId;
    private String componentId;
    private Object taskObject; // Spout/Bolt object
    private Map<String, Object> topoConf;
    private BooleanSupplier emitSampler;
    private CommonStats executorStats;
    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
    private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
    private boolean debug;

    public Task(Executor executor, Integer taskId) throws IOException {
        this.taskId = taskId;
        this.executor = executor;
        this.workerData = executor.getWorkerData();
        this.topoConf = executor.getTopoConf();
        this.componentId = executor.getComponentId();
        this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
        this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
        this.executorStats = executor.getStats();
        this.workerTopologyContext = executor.getWorkerTopologyContext();
        this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
        this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
        this.userTopologyContext = mkTopologyContext(workerData.getTopology());
        this.taskObject = mkTaskObject();
        this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
        this.addTaskHooks();
        this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId);
    }

    //......
}
複製程式碼

Executor.accept

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    @Override
    public void accept(Object event) {
        AddressedTuple addressedTuple = (AddressedTuple) event;
        int taskId = addressedTuple.getDest();

        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
        if (isDebug) {
            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
        }

        try {
            if (taskId != AddressedTuple.BROADCAST_DEST) {
                tupleActionFn(taskId, tuple);
            } else {
                for (Integer t : taskIds) {
                    tupleActionFn(t, tuple);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
複製程式碼
  • accept方法主要是對每個taskId,挨個呼叫tupleActionFn方法
  • BoltExecutor.tupleActionFn主要是從task獲取boltObject,然後呼叫boltObject.execute(tuple);
  • SpoutExecutor.tupleActionFn主要是從RotatingMap<Long, TupleInfo> pending取出TupleInfo,然後進行成功或失敗的ack

ExecutorShutdown

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java

public class ExecutorShutdown implements Shutdownable, IRunningExecutor {

    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);

    private final Executor executor;
    private final List<Utils.SmartThread> threads;
    private final ArrayList<Task> taskDatas;
    private final JCQueue receiveQueue;

    //......

    @Override
    public void credentialsChanged(Credentials credentials) {
        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
                                        Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
                                        Constants.CREDENTIALS_CHANGED_STREAM_ID);
        AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
        try {
            executor.getReceiveQueue().publish(addressedTuple);
            executor.getReceiveQueue().flush();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void loadChanged(LoadMapping loadMapping) {
        executor.reflectNewLoadMapping(loadMapping);
    }

    @Override
    public JCQueue getReceiveQueue() {
        return receiveQueue;
    }

    @Override
    public boolean publishFlushTuple() {
        return executor.publishFlushTuple();
    }

    @Override
    public void shutdown() {
        try {
            LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
            executor.getReceiveQueue().close();
            for (Utils.SmartThread t : threads) {
                t.interrupt();
            }
            for (Utils.SmartThread t : threads) {
                LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
                t.join();
            }
            executor.getStats().cleanupStats();
            for (Task task : taskDatas) {
                if (task == null) {
                    continue;
                }
                TopologyContext userContext = task.getUserContext();
                for (ITaskHook hook : userContext.getHooks()) {
                    hook.cleanup();
                }
            }
            executor.getStormClusterState().disconnect();
            if (executor.getOpenOrPrepareWasCalled().get()) {
                for (Task task : taskDatas) {
                    if (task == null) {
                        continue;
                    }
                    Object object = task.getTaskObject();
                    if (object instanceof ISpout) {
                        ((ISpout) object).close();
                    } else if (object instanceof IBolt) {
                        ((IBolt) object).cleanup();
                    } else {
                        LOG.error("unknown component object");
                    }
                }
            }
            LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}
複製程式碼
  • ExecutorShutdown主要包裝了一下shutdown的處理

小結

  • worker啟動之後從去zk的/assignments/{topology}路徑,比如/assignments/DemoTopology-1-1539163962讀取assignment資訊
  • 然後根據assignment資訊獲取Map<List, NodeInfo> executorToNodePort,然後通過Executor.mkExecutor建立Executor
  • 建立Executor的時候根據assignment資訊中的task資訊建立Task繫結到Executor
  • 之後呼叫executor的execute方法,這個方法啟動Utils.SmartThread,該thread迴圈呼叫Executor.call().call()方法

BoltExecutor.call主要是呼叫receiveQueue.consume方法;SpoutExecutor.call除了呼叫receiveQueue.consume方法,還呼叫了spouts.get(j).nextTuple()

  • receiveQueue.consume方法主要是呼叫Executor的accept方法,而accept方法主要是對每個taskId,挨個呼叫tupleActionFn方法

BoltExecutor.tupleActionFn主要是從task獲取boltObject,然後呼叫boltObject.execute(tuple);SpoutExecutor.tupleActionFn主要是從RotatingMap<Long, TupleInfo> pending取出TupleInfo,然後進行成功或失敗的ack

  • worker可以理解為程序,executor即為該程序裡頭的執行緒數,而task則可以理解為spout或bolt的例項,預設是一個executor對應一個spout或bolt的task
  • 增加worker或executor可以對supervisor進行擴容,這個過程稱之為rebalance,而task則作為載體及任務的抽象從負載大的worker的executor轉到新worker的executor上,實現rebalance(rebalance命令只能重新調整worker、executor數量,無法改變task數量)

doc