spring boot 執行緒任務
阿新 • • 發佈:2018-12-11
//任務模組 @Service public class TaskManager extends SimpleEventDispatcher<BaseTask> { private static final Logger log = LoggerFactory.getLogger(DatabaseManager.class); static private TaskManager instance; static public TaskManager getInstance() { return instance; } @Autowired public TaskManager() { super(1000); TaskManager.instance = this; } @Override protected void dispatchOnIdle() { super.dispatchOnIdle(); } @Override protected void dispatchOnExit() { super.dispatchOnExit(); } @Override protected void dispatchInWorkThread(BaseTask task) { try { if (task.getType() == BaseTask.TYPE_BUY_TASK) { BuyTask buyTask = (BuyTask) task; processBuyTask(buyTask); } else if (task.getType() == BaseTask.TYPE_OPERATOR_TASK) { OperatorTask operatorTask = (OperatorTask) task; processOperator(operatorTask); } else if (task.getType() == BaseTask.TYPE_GET_COORDINATE) { GetCoordinateTask getCoordinateTask = (GetCoordinateTask) task; processGetCoordinate(getCoordinateTask); } } catch (Exception e) { e.printStackTrace(); log.warn("exception: {}", e.toString()); } } private void processOperator(OperatorTask operatorTask){ //todo } ... }
任務子類
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; public class SimpleEventDispatcher<EventType> { private BlockingQueue<EventType> blockingQueue = new LinkedBlockingDeque<>(); private WorkerThread mWorkerThread; private long idleTimeout = 2500; protected SimpleEventDispatcher() { mWorkerThread = new WorkerThread(blockingQueue); mWorkerThread.start(); } protected SimpleEventDispatcher(long idleTimeoutArg) { idleTimeout = idleTimeoutArg; mWorkerThread = new WorkerThread(blockingQueue); mWorkerThread.start(); } public void put(EventType event) { if (event != null) { blockingQueue.add(event); } } public void shutdown() { mWorkerThread.interrupt(); } protected void dispatchInWorkThread(EventType event) { } protected void dispatchOnIdle() { } protected void dispatchOnExit() { } public class WorkerThread extends Thread { private final BlockingQueue<EventType> queue; public WorkerThread(BlockingQueue<EventType> queue) { this.queue = queue; } @Override public void run() { long timestamp = System.currentTimeMillis(); while (!isInterrupted()) { try { EventType event = queue.poll(idleTimeout, TimeUnit.MILLISECONDS); if (event != null) { dispatchInWorkThread(event); if (System.currentTimeMillis() - timestamp >= idleTimeout) { dispatchOnIdle(); timestamp = System.currentTimeMillis(); } } else { dispatchOnIdle(); timestamp = System.currentTimeMillis(); } } catch (InterruptedException e) { e.printStackTrace(); try { Thread.sleep(10); } catch (InterruptedException ex) { } } } dispatchOnExit(); } } }
任務繼承於基類SimpleEventDispatcher類處理執行緒,任務資料put()進來
@PostMapping(value = "reqOperator") public DeferredResult<ResponseData> reqOperator(@RequestBody String reqBody) { DeferredResult<ResponseData> future = new DeferredResult(new Long(1000 * 10)); log.info("reqOperator req:{}", reqBody); Pkg.ReqOperator req = Utility.parseRequestBody(reqBody, Pkg.ReqOperator.class); if (req == null || !req.isValid()) future.setResult(ResponseError.BodyContentInvalid); try { OperatorTask operatorTask = new OperatorTask(req, future); TaskManager.getInstance().put(operatorTask); } catch (Exception e) { future.setResult(ResponseError.OperatorFailed); } return future; } //注意DeferredResult使用類似C++11的future
這樣併發資料進來,根據佇列一一處理。