1. 程式人生 > >NC57 定時任務TaskExecutor 控制單個任務

NC57 定時任務TaskExecutor 控制單個任務

/*
 * @(#)TaskExecutor.java 1.0 2004-10-12
 *
 * Copyright 2005 UFIDA Software Co. Ltd. All rights reserved.
 * UFIDA PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */
package nc.bs.uap.scheduler.impl;

import nc.bs.framework.common.IAttributeManager;
import nc.bs.framework.common.NCLocator;
import nc.bs.framework.component.RemoteProcessComponetFactory;
import nc.bs.framework.execute.Executor;
import nc.bs.framework.task.ICRController;
import nc.bs.framework.tx.NoSpecified;
import nc.bs.logging.Logger;
import nc.bs.uap.scheduler.ITask;
import nc.bs.uap.scheduler.Messages;
import nc.itf.uap.scheduler.IExecutorService;
import nc.vo.uap.scheduler.TaskStatus;
import nc.vo.uap.scheduler.TaskWrapper;

/**
 * 任務的執行器。對經過排程的任務進行處理和執行。由任務執行器池生成。
 * <p>
 * 在接到任務後首先開始執行任務,執行完畢後,把自己加入到任務執行器池的空閒執行緒佇列;自身進入等待狀態,等待任務執行器池喚醒。
 * 如果沒有任務,則進入IDLE的等待狀態,如果超時還沒有分配的任務,則將通知任務執行器池。任務執行器池根據設定的規則處理此任務執行器。
 * 
 * @author yanglei
 * @author hgy 5.3
 * @since 3.0
 */
public class TaskExecutor extends Executor {

	private volatile TaskWrapper taskWrapper = null;

	/* 任務執行器所歸屬的執行緒池 */
	private TaskExecutorPool pool = null;

	private RemoteProcessComponetFactory factory;

	private IAttributeManager requestAttrMgr;

	private final ICRController crController;

	private static boolean SCHEDULE_TEST = "test".equals(System
			.getProperty("nc.scheduler.mode"));

	/**
	 * 構造任務執行器
	 * 
	 * @param name
	 *            任務執行器的名稱。
	 * @param pool
	 *            任務執行器所歸屬的執行緒池。
	 */
	TaskExecutor(String name, TaskExecutorPool pool) {
		super(name);
		this.pool = pool;
		crController = NCLocator.getInstance().lookup(ICRController.class);
	}

	/**
	 * 執行器執行的主過程。
	 */
	@Override
	public void run() {
		for (;;) {
			manageNullTask();
			if (getTaskWrapper() == null) {
				if (pool.notifyTimeOut(this))
					return;
				else {
					continue;
				}
			} else {
				try {
					executeTask();
				} finally {
					TaskWrapper tw = getTaskWrapper();
					setTaskWrapper(null);
					if (pool.notifyFree(this, tw))
						return;

				}
			}
		}
	}

	/**
	 * 執行任務。
	 */
	private void executeTask() {
		boolean hasException = false;
		TaskWrapper tw = getTaskWrapper();
		// Add By 門志永@雲達/新華書店專案2013-4-21 START 執行特定的後臺任務
		/**
		 * XHFX物流介面_分書同步
		 * XHFX物流介面_運單同步
		 */
		if(tw.getTask().getType().equals("1")&&!tw.getTask().getName().equals("XHFX物流介面_運單333同步"))//同步BD_INVBASDOC_PKINVTEMP品種到中間庫
			return;
		tw.setStatus(TaskStatus.RUNNING);
		logBeginExecute(tw);
		ITask task = tw.getTask();
		try {
			Logger.init(Messages.LOGGER_ID);
			long now = System.currentTimeMillis();
			TaskWrapper.taskWrapperContext.set(tw);
			bgProcess(tw, true, null);
			tw.setThread(Thread.currentThread());
			if (SCHEDULE_TEST
					|| task.getTaskBody().getClass().getAnnotation(
							NoSpecified.class) != null) {
				Messages.log.debug("execute task without default transaction: " + task.getTaskBody());
				task.getTaskBody().execute();
			} else {
				IExecutorService es = (IExecutorService) NCLocator
						.getInstance().lookup(IExecutorService.class.getName());
				es.executeTask_RequiresNew(task.getTaskBody());
			}
			tw.setStatus(TaskStatus.FINISHED);
			logExecuteFinished(tw, System.currentTimeMillis() - now);
		} catch (Exception e) {
			tw.setStatus(TaskStatus.FAILED);
			executeTaskException(tw, e);
			hasException = true;
		} finally {
			executeTaskFinally(hasException, tw);
			tw.setThread(null);
			if (tw.getCRToken() != null) {
				crController.releaseToken(tw.getCRToken());
				tw.setCRToken(null);
			}

			TaskWrapper.taskWrapperContext.set(null);
			Logger.reset();
		}
	}

	/**
	 * 執行時發生異常。
	 * 
	 * @param task
	 * @param e
	 */
	private void executeTaskException(TaskWrapper tw, Exception e) {
		if (e != null) {
			logExecuteTaskException(tw, e);
		} else {
			try {
				tw.getTask().getTaskBody().cancelExecute();
			} catch (Exception ex) {
				logException(ex);
			}
		}

		bgProcess(tw, false, e);
	}

	/**
	 * 任務執行完畢,將該任務的物件引用設定為<tt>null</tt>,並且通知由執行器池來處理該執行器。
	 * 
	 * @return
	 */
	private void executeTaskFinally(boolean hasException, TaskWrapper tw) {
		if (!hasException) {
			bgProcess(tw, false, null);
		}

		RemoteProcessComponetFactory factory = getRemoteProcessComponetFactory();
		if (factory != null) {
			factory.clearThreadScopePostProcess();
		}

		try {
			IAttributeManager requestAttrMgr = getAttributeManager();
			if (requestAttrMgr != null) {
				requestAttrMgr.clear();
			}
		} catch (Throwable thr) {
			Messages.log.error("Background executeTaskFinally error", thr);
		}
	}

	/**
	 * 如果該任務為<tt>null</tt>,則只等待一個超時時間。
	 */
	private void manageNullTask() {
		synchronized (this) {
			if (getTaskWrapper() == null) {
				try {
					wait(pool.getExecutorIdleTimeOut());
				} catch (InterruptedException e) {
					logException(e);
				}
			}
		}
	}

	/**
	 * 將任務傳給任務執行器,並喚醒之(由執行緒池呼叫)。
	 * 
	 * @param taskWrapper
	 *            任務包裝類,包括任務ID,任務物件,任務優先順序
	 */
	synchronized void execute(TaskWrapper taskWrapper) {
		setTaskWrapper(taskWrapper);
		notifyAll();
	}

	public TaskWrapper getTaskWrapper() {
		return taskWrapper;
	}

	private void setTaskWrapper(TaskWrapper taskWrapper) {
		this.taskWrapper = taskWrapper;
	}

	private void logBeginExecute(TaskWrapper tw) {
		Messages.log.debug("Begin " + tw);
	}

	/**
	 * 對任務的執行產生的異常進行記錄。
	 * 
	 * @param task
	 * @param e
	 */
	private void logExecuteTaskException(TaskWrapper tw, Exception e) {
		Messages.log.error("Error execute " + tw, e);

	}

	/**
	 * 對異常進行記錄。
	 * 
	 * @param e
	 */
	private void logException(Throwable e) {
		Messages.log.warn(Messages.getString("TaskExecutor.exception"), e);
	}

	/**
	 * 對任務執行完畢進行記錄。
	 * 
	 * @param e
	 */
	private void logExecuteFinished(TaskWrapper tw, long time) {
		Messages.log.debug("Finished " + tw + " spend time: " + time);
	}

	private RemoteProcessComponetFactory getRemoteProcessComponetFactory() {
		if (factory == null) {
			try {
				factory = (RemoteProcessComponetFactory) NCLocator
						.getInstance().lookup("RemoteProcessComponetFactory");
			} catch (Throwable thr) {
				Logger.error("Maybe the BizServer has not been started", thr);
			}
		}
		return factory;
	}

	private IAttributeManager getAttributeManager() {
		if (requestAttrMgr == null) {
			try {
				requestAttrMgr = (IAttributeManager) NCLocator
						.getInstance()
						.lookup(
								"nc.bs.framework.core.service.RequestAttributeManager");
			} catch (Throwable thr) {
				Logger.error("Maybe the BizServer has not been started", thr);
			}
		}
		return requestAttrMgr;
	}

	private void bgProcess(TaskWrapper tw, boolean pre, Throwable t) {
		try {
			RemoteProcessComponetFactory factory = getRemoteProcessComponetFactory();
			if (factory != null) {
				if (pre) {
					factory.preProcess();
				} else {
					if (t == null) {
						factory.postProcess();
					} else {
						factory.postErrorProcess(t);
					}
				}
			}
		} catch (Throwable thr) {
			Messages.log.error("Background process error", thr);
		}
	}

}