NC57 定時任務TaskExecutor 控制單個任務
阿新 • • 發佈:2018-12-16
/* * @(#)TaskExecutor.java 1.0 2004-10-12 * * Copyright 2005 UFIDA Software Co. Ltd. All rights reserved. * UFIDA PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package nc.bs.uap.scheduler.impl; import nc.bs.framework.common.IAttributeManager; import nc.bs.framework.common.NCLocator; import nc.bs.framework.component.RemoteProcessComponetFactory; import nc.bs.framework.execute.Executor; import nc.bs.framework.task.ICRController; import nc.bs.framework.tx.NoSpecified; import nc.bs.logging.Logger; import nc.bs.uap.scheduler.ITask; import nc.bs.uap.scheduler.Messages; import nc.itf.uap.scheduler.IExecutorService; import nc.vo.uap.scheduler.TaskStatus; import nc.vo.uap.scheduler.TaskWrapper; /** * 任務的執行器。對經過排程的任務進行處理和執行。由任務執行器池生成。 * <p> * 在接到任務後首先開始執行任務,執行完畢後,把自己加入到任務執行器池的空閒執行緒佇列;自身進入等待狀態,等待任務執行器池喚醒。 * 如果沒有任務,則進入IDLE的等待狀態,如果超時還沒有分配的任務,則將通知任務執行器池。任務執行器池根據設定的規則處理此任務執行器。 * * @author yanglei * @author hgy 5.3 * @since 3.0 */ public class TaskExecutor extends Executor { private volatile TaskWrapper taskWrapper = null; /* 任務執行器所歸屬的執行緒池 */ private TaskExecutorPool pool = null; private RemoteProcessComponetFactory factory; private IAttributeManager requestAttrMgr; private final ICRController crController; private static boolean SCHEDULE_TEST = "test".equals(System .getProperty("nc.scheduler.mode")); /** * 構造任務執行器 * * @param name * 任務執行器的名稱。 * @param pool * 任務執行器所歸屬的執行緒池。 */ TaskExecutor(String name, TaskExecutorPool pool) { super(name); this.pool = pool; crController = NCLocator.getInstance().lookup(ICRController.class); } /** * 執行器執行的主過程。 */ @Override public void run() { for (;;) { manageNullTask(); if (getTaskWrapper() == null) { if (pool.notifyTimeOut(this)) return; else { continue; } } else { try { executeTask(); } finally { TaskWrapper tw = getTaskWrapper(); setTaskWrapper(null); if (pool.notifyFree(this, tw)) return; } } } } /** * 執行任務。 */ private void executeTask() { boolean hasException = false; TaskWrapper tw = getTaskWrapper(); // Add By 門志永@雲達/新華書店專案2013-4-21 START 執行特定的後臺任務 /** * XHFX物流介面_分書同步 * XHFX物流介面_運單同步 */ if(tw.getTask().getType().equals("1")&&!tw.getTask().getName().equals("XHFX物流介面_運單333同步"))//同步BD_INVBASDOC_PKINVTEMP品種到中間庫 return; tw.setStatus(TaskStatus.RUNNING); logBeginExecute(tw); ITask task = tw.getTask(); try { Logger.init(Messages.LOGGER_ID); long now = System.currentTimeMillis(); TaskWrapper.taskWrapperContext.set(tw); bgProcess(tw, true, null); tw.setThread(Thread.currentThread()); if (SCHEDULE_TEST || task.getTaskBody().getClass().getAnnotation( NoSpecified.class) != null) { Messages.log.debug("execute task without default transaction: " + task.getTaskBody()); task.getTaskBody().execute(); } else { IExecutorService es = (IExecutorService) NCLocator .getInstance().lookup(IExecutorService.class.getName()); es.executeTask_RequiresNew(task.getTaskBody()); } tw.setStatus(TaskStatus.FINISHED); logExecuteFinished(tw, System.currentTimeMillis() - now); } catch (Exception e) { tw.setStatus(TaskStatus.FAILED); executeTaskException(tw, e); hasException = true; } finally { executeTaskFinally(hasException, tw); tw.setThread(null); if (tw.getCRToken() != null) { crController.releaseToken(tw.getCRToken()); tw.setCRToken(null); } TaskWrapper.taskWrapperContext.set(null); Logger.reset(); } } /** * 執行時發生異常。 * * @param task * @param e */ private void executeTaskException(TaskWrapper tw, Exception e) { if (e != null) { logExecuteTaskException(tw, e); } else { try { tw.getTask().getTaskBody().cancelExecute(); } catch (Exception ex) { logException(ex); } } bgProcess(tw, false, e); } /** * 任務執行完畢,將該任務的物件引用設定為<tt>null</tt>,並且通知由執行器池來處理該執行器。 * * @return */ private void executeTaskFinally(boolean hasException, TaskWrapper tw) { if (!hasException) { bgProcess(tw, false, null); } RemoteProcessComponetFactory factory = getRemoteProcessComponetFactory(); if (factory != null) { factory.clearThreadScopePostProcess(); } try { IAttributeManager requestAttrMgr = getAttributeManager(); if (requestAttrMgr != null) { requestAttrMgr.clear(); } } catch (Throwable thr) { Messages.log.error("Background executeTaskFinally error", thr); } } /** * 如果該任務為<tt>null</tt>,則只等待一個超時時間。 */ private void manageNullTask() { synchronized (this) { if (getTaskWrapper() == null) { try { wait(pool.getExecutorIdleTimeOut()); } catch (InterruptedException e) { logException(e); } } } } /** * 將任務傳給任務執行器,並喚醒之(由執行緒池呼叫)。 * * @param taskWrapper * 任務包裝類,包括任務ID,任務物件,任務優先順序 */ synchronized void execute(TaskWrapper taskWrapper) { setTaskWrapper(taskWrapper); notifyAll(); } public TaskWrapper getTaskWrapper() { return taskWrapper; } private void setTaskWrapper(TaskWrapper taskWrapper) { this.taskWrapper = taskWrapper; } private void logBeginExecute(TaskWrapper tw) { Messages.log.debug("Begin " + tw); } /** * 對任務的執行產生的異常進行記錄。 * * @param task * @param e */ private void logExecuteTaskException(TaskWrapper tw, Exception e) { Messages.log.error("Error execute " + tw, e); } /** * 對異常進行記錄。 * * @param e */ private void logException(Throwable e) { Messages.log.warn(Messages.getString("TaskExecutor.exception"), e); } /** * 對任務執行完畢進行記錄。 * * @param e */ private void logExecuteFinished(TaskWrapper tw, long time) { Messages.log.debug("Finished " + tw + " spend time: " + time); } private RemoteProcessComponetFactory getRemoteProcessComponetFactory() { if (factory == null) { try { factory = (RemoteProcessComponetFactory) NCLocator .getInstance().lookup("RemoteProcessComponetFactory"); } catch (Throwable thr) { Logger.error("Maybe the BizServer has not been started", thr); } } return factory; } private IAttributeManager getAttributeManager() { if (requestAttrMgr == null) { try { requestAttrMgr = (IAttributeManager) NCLocator .getInstance() .lookup( "nc.bs.framework.core.service.RequestAttributeManager"); } catch (Throwable thr) { Logger.error("Maybe the BizServer has not been started", thr); } } return requestAttrMgr; } private void bgProcess(TaskWrapper tw, boolean pre, Throwable t) { try { RemoteProcessComponetFactory factory = getRemoteProcessComponetFactory(); if (factory != null) { if (pre) { factory.preProcess(); } else { if (t == null) { factory.postProcess(); } else { factory.postErrorProcess(t); } } } } catch (Throwable thr) { Messages.log.error("Background process error", thr); } } }