1. 程式人生 > >spring boot 執行緒任務

spring boot 執行緒任務

//任務模組
@Service
public class TaskManager extends SimpleEventDispatcher<BaseTask> {
    private static final Logger log = LoggerFactory.getLogger(DatabaseManager.class);

    static private TaskManager instance;

    static public TaskManager getInstance() {
        return instance;
    }

    @Autowired
    public TaskManager() {
        super(1000);
        TaskManager.instance = this;
    }

    @Override
    protected void dispatchOnIdle() {
        super.dispatchOnIdle();
    }

    @Override
    protected void dispatchOnExit() {
        super.dispatchOnExit();
    }

    @Override
    protected void dispatchInWorkThread(BaseTask task) {
        try {
            if (task.getType() == BaseTask.TYPE_BUY_TASK) {
                BuyTask buyTask = (BuyTask) task;
                processBuyTask(buyTask);
            } else if (task.getType() == BaseTask.TYPE_OPERATOR_TASK) {
                OperatorTask operatorTask = (OperatorTask) task;
                processOperator(operatorTask);
            } else if (task.getType() == BaseTask.TYPE_GET_COORDINATE) {
                GetCoordinateTask getCoordinateTask = (GetCoordinateTask) task;
                processGetCoordinate(getCoordinateTask);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.warn("exception: {}", e.toString());
        }
    }

    private void processOperator(OperatorTask operatorTask){
        //todo
    }

    ...
}
    

任務子類

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class SimpleEventDispatcher<EventType> {

    private BlockingQueue<EventType> blockingQueue = new LinkedBlockingDeque<>();
    private WorkerThread mWorkerThread;
    private long idleTimeout = 2500;

    protected SimpleEventDispatcher() {
        mWorkerThread = new WorkerThread(blockingQueue);
        mWorkerThread.start();
    }

    protected SimpleEventDispatcher(long idleTimeoutArg) {
        idleTimeout = idleTimeoutArg;
        mWorkerThread = new WorkerThread(blockingQueue);
        mWorkerThread.start();
    }

    public void put(EventType event) {
        if (event != null) {
            blockingQueue.add(event);
        }
    }

    public void shutdown() {
        mWorkerThread.interrupt();
    }

    protected void dispatchInWorkThread(EventType event) {

    }

    protected void dispatchOnIdle() {

    }

    protected void dispatchOnExit() {

    }

    public class WorkerThread extends Thread {

        private final BlockingQueue<EventType> queue;

        public WorkerThread(BlockingQueue<EventType> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            long timestamp = System.currentTimeMillis();
            while (!isInterrupted()) {
                try {
                    EventType event = queue.poll(idleTimeout, TimeUnit.MILLISECONDS);
                    if (event != null) {
                        dispatchInWorkThread(event);
                        if (System.currentTimeMillis() - timestamp >= idleTimeout) {
                            dispatchOnIdle();
                            timestamp = System.currentTimeMillis();
                        }
                    } else {
                        dispatchOnIdle();
                        timestamp = System.currentTimeMillis();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException ex) {
                    }
                }
            }
            dispatchOnExit();
        }
    }
}

 任務繼承於基類SimpleEventDispatcher類處理執行緒,任務資料put()進來

    @PostMapping(value = "reqOperator")
    public DeferredResult<ResponseData> reqOperator(@RequestBody String reqBody) {
        DeferredResult<ResponseData> future = new DeferredResult(new Long(1000 * 10));
        log.info("reqOperator req:{}", reqBody);
        Pkg.ReqOperator req = Utility.parseRequestBody(reqBody, Pkg.ReqOperator.class);
        if (req == null || !req.isValid())
            future.setResult(ResponseError.BodyContentInvalid);
        try {
            OperatorTask operatorTask = new OperatorTask(req, future);
            TaskManager.getInstance().put(operatorTask);
        } catch (Exception e) {
            future.setResult(ResponseError.OperatorFailed);
        }
        return future;
    }

    //注意DeferredResult使用類似C++11的future

這樣併發資料進來,根據佇列一一處理。