1. 程式人生 > 實用技巧 >時間輪演算法 簡單學習

時間輪演算法 簡單學習

時間輪

參考: https://github.com/wolaiye1010/zdc-java-script/
參考: https://www.cnblogs.com/zhongwencool/p/timing_wheel.html

為什麼要用時間輪實現

通常用於實現linux核心任務、遊戲類的buf計時。
單個時間輪侷限性:儲存的任務數量少,不能超過當前時間輪。
多層時間輪,典型:日-時-分-秒
傳統java實現定時:Timer,只能單執行緒,會阻塞; Executors.newScheduledThreadPoll, 使用的最小堆來實現,任務還是不能太多,新增時間複雜度為O(logn)

時間輪簡單實現

1.實現日、時、分、秒、毫秒時間輪,設定Tick間隔,tick=100ms
2.建立對應的時間輪陣列,ms=1000/tick,s=60,min=60,hour=24,day=365,陣列中的每個元素型別為List
3.新增定時任務(不能新增到當前tick區間)
3.1 獲取當前時間輪時間 curTime, 獲取定時任務delayTime
3.2使用curTime+delayTime獲得任務下一次執行的事件,並判斷是否需要時間輪升級(ms->s->min->hour->day)
3.3將該定時任務新增到升級後時間輪的具體索引上
4.時間輪-one Tick增加
4.1 每一個tick間隔,將會進行一次one-tick增加,會觸發新的tick間隔內的任務,如果該任務period大於0,則繼續新增該定時任務,參考3
4.2 每一層的時間輪完成後都會進行,時間輪升級(例如59s,增加到下一輪,現獲取下一輪min的所有任務,然後再更新新的min對應s的時間輪),升級後新的時間輪任務就會在具體的降級時間輪中進行定位新增;

時間輪演算法實現-參考https://github.com/wolaiye1010/zdc-java-script/ 加了些註釋,增加取消

public class TimeWheelService {


    public static void main(String[] args) {
        TimeWheelService timeWheelService = new TimeWheelService(3);


        timeWheelService.schedule(()->{
            for(int i=0;i<100;i++){
                final int a=i;
                timeWheelService.schedule(()-> System.out.println("^^^^^^buff-"+a),100,80);
            }
        },100,0);
        timeWheelService.schedule(()->{
            for(int i=0;i<100;i++){
                final int a=i;
                timeWheelService.schedule(()-> System.out.println("=====>debuff-"+a),100,100);
            }
        },100,0);
        timeWheelService.schedule(()-> System.out.println(new Date()),10,1000);
    }

    private MultiTimeWheel timeWheel=new MultiTimeWheel();
    private TimeWheelThread timeWheelThread=new TimeWheelThread();


    //每輪的時間輪長度
    private static final int TICK=10;
    private static final int wheelIndexMillisecondLength=1000/TICK;
    private static final int wheelIndexSecondLength=60;
    private static final int wheelIndexMinuteLength=60;
    private static final int wheelIndexHourLength=24;
    private static final int wheelIndexDayLength=365;

    //每一輪對應的所有ticks
    private static final long wheelMillisecondAllTicks=1L;
    //1s  10格
    private static final long wheelSecondAllTicks=wheelMillisecondAllTicks*wheelIndexMillisecondLength;
    //1min  600
    private static final long wheelMinuteAllTicks=wheelSecondAllTicks*wheelIndexSecondLength;
    //1h
    private static final long wheelHourAllTicks=wheelMinuteAllTicks*wheelIndexMinuteLength;
    //1day
    private static final long wheelDayAllTicks=wheelHourAllTicks*wheelIndexHourLength;


    //每一輪當前的索引,可以精確獲取時間
    private AtomicInteger wheelIndexMillisecond=new AtomicInteger(0);
    private AtomicInteger wheelIndexSecond=new AtomicInteger(0);
    private AtomicInteger wheelIndexMinute=new AtomicInteger(0);
    private AtomicInteger wheelIndexHour=new AtomicInteger(0);
    private AtomicInteger wheelIndexDay=new AtomicInteger(0);


    //實際儲存
    private volatile Vector[] wheelMillisecond=new Vector[wheelIndexMillisecondLength];
    private volatile Vector[] wheelSecond=new Vector[wheelIndexSecondLength];
    private volatile Vector[] wheelMinute=new Vector[wheelIndexMinuteLength];
    private volatile Vector[] wheelHour=new Vector[wheelIndexHourLength];
    private volatile Vector[] wheelDay =new Vector[wheelIndexDayLength];



    public void schedule(Runnable runnable,long delay,long period){
        if(period<TICK && period>0) throw new RuntimeException("不能使得間隔週期小於時間片TICK:"+TICK+"  ms,間隔週期可以為0ms");
        synchronized(this){
            TimeWheelTask timeWheelTask = new TimeWheelTask(delay, period, runnable);
            schedule(timeWheelTask);
        }
    }


    public void schedule(TimeWheelTask timeWheelTask){
        //delay 加上當前相對於具體時間單位的餘數。
        //處理當前是0分59s時加入了1分1秒後任務,會導致在1分1秒時候執行,因此如果延遲本身大於當前的一輪週期,則用延遲加上當前時間與本輪毫秒值的餘數
        //00:00:59 + 61 = 00:02:00,可知,需要先加上本輪餘數
        timeWheelTask.delay=timeWheelTask.delay+timeWheel.getWheelNowTime(timeWheelTask.delay);
        timeWheel.addTaskToWheel(timeWheelTask.delay,timeWheelTask);
    }



    //真正執行定時任務的執行緒池
    private ThreadPoolExecutor threadPoolExecutor;

    public TimeWheelService(int coreSize){
        this.threadPoolExecutor=new ThreadPoolExecutor(coreSize,coreSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10000));
        timeWheelThread.start();
    }



    //輪子

    class MultiTimeWheel{

        /**
         * 增加  one-tick,可能會觸發每層輪,時間輪的升級
         */
        public void incrTick() {
            if(incIndex(TimeUnit.MILLISECONDS)){
                return;
            }

            if(incIndex(TimeUnit.SECONDS)){
                return;
            }

            if(incIndex(TimeUnit.MINUTES)){
                return;
            }

            if(incIndex(TimeUnit.HOURS)){
                return;
            }

            incIndex(TimeUnit.DAYS);

        }

        //增加一個tick,處理因為升級導致的新事件新增
        private boolean incIndex(TimeUnit timeUnit){
            long allTicksNext;
            Vector[] vectorsNext;
            AtomicInteger index;
            AtomicInteger indexNext;
            int wheelLength;
            int wheelLengthNext;

            switch(timeUnit){
                case DAYS:
                    allTicksNext=0;
                    vectorsNext=null;
                    index=wheelIndexDay;
                    indexNext=null;
                    wheelLength=wheelIndexDayLength;
                    wheelLengthNext=0;
                    break;

                case HOURS:
                    allTicksNext=wheelDayAllTicks;
                    vectorsNext=wheelDay;
                    index=wheelIndexHour;
                    indexNext=wheelIndexDay;
                    wheelLength=wheelIndexHourLength;
                    wheelLengthNext=wheelIndexDayLength;
                    break;
                case MINUTES:
                    allTicksNext=wheelHourAllTicks;
                    vectorsNext=wheelHour;
                    index=wheelIndexMinute;
                    indexNext=wheelIndexHour;
                    wheelLength=wheelIndexMinuteLength;
                    wheelLengthNext=wheelIndexHourLength;
                    break;
                case SECONDS:
                    allTicksNext=wheelMinuteAllTicks;
                    vectorsNext=wheelMinute;
                    index=wheelIndexSecond;
                    indexNext=wheelIndexMinute;
                    wheelLength=wheelIndexSecondLength;
                    wheelLengthNext=wheelIndexMinuteLength;
                    break;
                case MILLISECONDS:
                    allTicksNext=wheelSecondAllTicks;
                    vectorsNext=wheelSecond;
                    index=wheelIndexMillisecond;
                    indexNext=wheelIndexSecond;
                    wheelLength=wheelIndexMillisecondLength;
                    wheelLengthNext=wheelIndexSecondLength;
                    break;
                default:
                    throw new RuntimeException("Timeunit 引數錯誤");
            }

            index.getAndIncrement();
            if(index.get()<wheelLength){
                return true;
            }
            index.set(index.get()%wheelLength);
            //如果是天數,因為當處理hours時候已經處理過天了,所以直接返回。
            if(timeUnit.equals(TimeUnit.DAYS)){
                return true;
            }

            //獲取下一個時間輪的任務,並新增
            List<TimeWheelTask> taskList = vectorsNext[(indexNext.get() + 1) % wheelLengthNext];

            if(null!=taskList){
                for(TimeWheelTask task:taskList){
                    addTaskToWheel(task.delay%(allTicksNext),task);
                }
                taskList.clear();
            }
            return false;
        }

        public List<TimeWheelTask> getTaskList() {
            return wheelMillisecond[wheelIndexMillisecond.get()];
        }


        //加入時間輪,判斷是否需要升級
        void addTaskToWheel(long delay,TimeWheelTask task){
            if(delay>=wheelIndexDayLength*wheelDayAllTicks*TICK){
                throw new RuntimeException("不能超過一年");
            }
            if(addTaskToWheel(delay,task,TimeUnit.DAYS)){
                return;
            }
            if(addTaskToWheel(delay,task,TimeUnit.HOURS)){
                return;
            }
            if(addTaskToWheel(delay,task,TimeUnit.MINUTES)){
                return;
            }
            if(addTaskToWheel(delay,task,TimeUnit.SECONDS)){
                return;
            }
            addTaskToWheel(delay,task,TimeUnit.MILLISECONDS);

        }


        //新增任務到時間輪,
        private boolean addTaskToWheel(long delay, TimeWheelTask timeWheelTask, TimeUnit timeUnit){
            long allTicks;
            Vector[] vectors;
            AtomicInteger index;
            int wheelLength;
            switch (timeUnit){
                case DAYS:
                    allTicks=wheelDayAllTicks;
                    vectors= wheelDay;
                    index=wheelIndexDay;
                    wheelLength=wheelIndexDayLength;
                    break;
                case HOURS:
                    allTicks=wheelHourAllTicks;
                    vectors=wheelHour;
                    index=wheelIndexHour;
                    wheelLength=wheelIndexHourLength;
                    break;
                case MINUTES:
                    allTicks=wheelMinuteAllTicks;
                    vectors=wheelMinute;
                    index=wheelIndexMinute;
                    wheelLength=wheelIndexMinuteLength;
                    break;
                case SECONDS:
                    allTicks=wheelSecondAllTicks;
                    vectors=wheelSecond;
                    index=wheelIndexSecond;
                    wheelLength=wheelIndexSecondLength;
                    break;
                case MILLISECONDS:
                    allTicks=wheelMillisecondAllTicks;
                    vectors=wheelMillisecond;
                    index=wheelIndexMillisecond;
                    wheelLength=wheelIndexMillisecondLength;
                    break;
                default:
                    throw new RuntimeException("timeUnit 引數錯誤");
            }

            //新增到當前的索引
            if(0!=delay/(allTicks*TICK) || timeUnit.equals(TimeUnit.MILLISECONDS)){
                int indexNew=(index.get()+(int)(delay/(allTicks*TICK)))%wheelLength;
                if(null==vectors[indexNew]){
                    vectors[indexNew]=new Vector();
                }
                vectors[indexNew].add(timeWheelTask);
                return true;
            }
            return false;

        }


        //準確獲取當前需要新增的準確時間輪
        long getWheelNowTime(long delay){
            //當前時間 毫秒
            long timeFromWheelStart=(wheelIndexDay.get()*wheelDayAllTicks+wheelIndexHour.get()*wheelHourAllTicks+wheelIndexMinute.get()*wheelMinuteAllTicks
                    +wheelIndexSecond.get()*wheelSecondAllTicks+wheelIndexMillisecond.get()*wheelMillisecondAllTicks)*TICK;

            //從大到小開始處理,是否大於1天
            if(0!=delay/(wheelDayAllTicks*TICK)){
                return timeFromWheelStart%(wheelDayAllTicks*TICK);
            }

            //大於1小時
            if(0!=delay/(wheelHourAllTicks*TICK)){
                return timeFromWheelStart%(wheelHourAllTicks*TICK);
            }

            //大於1分鐘
            if(0!=delay/(wheelMinuteAllTicks*TICK)){
                return timeFromWheelStart%(wheelMinuteAllTicks*TICK);
            }

            //大於1秒
            if(0!=delay/(wheelSecondAllTicks*TICK)){
                return timeFromWheelStart%(wheelSecondAllTicks*TICK);
            }

            return 0;
        }


    }


    /**
     * 時間輪的task
     */
    class TimeWheelTask implements Runnable{
        private long delay;
        //-1 為手動取消、0為1次定時、大於0表示正常排程間隔
        private long period;
        private Runnable runnable;

        public void setDelay(long delay) {
            this.delay = delay;
        }

        TimeWheelTask(long delay, long period, Runnable runnable){
            this.delay=delay;
            this.period=period;
            this.runnable=runnable;
        }

        /**
         * 判斷是否是週期性的排程任務
         * @return
         */
        public boolean isPeriodSchedule(){
            return period>0;
        }

        @Override
        public void run() {
            if(this.period==-1){
                return;
            }
            runnable.run();
        }

        /**
         * 手動消除排程
         */
        public void cancel(){
            this.period=-1;
        }

    }


    //時間輪 main執行緒
    class TimeWheelThread extends Thread{

        public TimeWheelThread(){
            super("TimeWheel_main");
        }


        public TimeWheelThread(String thread_name){
            super(thread_name);
        }


        @Override
        public void run() {
            try {
                mainLoop();
            }catch (Exception e){
                System.out.println(e);
            }finally {
                System.out.println(111);
            }

        }

        private void mainLoop() {

            while (true){
                //執行任務
                runTask(timeWheel.getTaskList());
                //時間增加  one-tick
                timeWheel.incrTick();

                //休眠
                try {
                    TimeUnit.MILLISECONDS.sleep(TICK);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }

            }

        }

        private void runTask(List<TimeWheelTask> taskList) {
            if(taskList==null || taskList.size()==0) return;
            for(TimeWheelTask timeWheelTask:taskList){
                threadPoolExecutor.execute(timeWheelTask);
                if(timeWheelTask.isPeriodSchedule()){
                    timeWheelTask.setDelay(timeWheelTask.period);
                    schedule(timeWheelTask);
                }
            }
            taskList.clear();


        }
    }


}

使用ScheduledThreadPoll

  ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println(new Date());
        }, 10, 20, TimeUnit.SECONDS);