時間輪演算法 簡單學習
阿新 • • 發佈:2020-08-18
時間輪
參考: https://github.com/wolaiye1010/zdc-java-script/
參考: https://www.cnblogs.com/zhongwencool/p/timing_wheel.html
為什麼要用時間輪實現
通常用於實現linux核心任務、遊戲類的buf計時。
單個時間輪侷限性:儲存的任務數量少,不能超過當前時間輪。
多層時間輪,典型:日-時-分-秒
傳統java實現定時:Timer,只能單執行緒,會阻塞; Executors.newScheduledThreadPoll, 使用的最小堆來實現,任務還是不能太多,新增時間複雜度為O(logn)
時間輪簡單實現
1.實現日、時、分、秒、毫秒時間輪,設定Tick間隔,tick=100ms 2.建立對應的時間輪陣列,ms=1000/tick,s=60,min=60,hour=24,day=365,陣列中的每個元素型別為List 3.新增定時任務(不能新增到當前tick區間) 3.1 獲取當前時間輪時間 curTime, 獲取定時任務delayTime 3.2使用curTime+delayTime獲得任務下一次執行的事件,並判斷是否需要時間輪升級(ms->s->min->hour->day) 3.3將該定時任務新增到升級後時間輪的具體索引上 4.時間輪-one Tick增加 4.1 每一個tick間隔,將會進行一次one-tick增加,會觸發新的tick間隔內的任務,如果該任務period大於0,則繼續新增該定時任務,參考3 4.2 每一層的時間輪完成後都會進行,時間輪升級(例如59s,增加到下一輪,現獲取下一輪min的所有任務,然後再更新新的min對應s的時間輪),升級後新的時間輪任務就會在具體的降級時間輪中進行定位新增;
時間輪演算法實現-參考https://github.com/wolaiye1010/zdc-java-script/ 加了些註釋,增加取消
public class TimeWheelService { public static void main(String[] args) { TimeWheelService timeWheelService = new TimeWheelService(3); timeWheelService.schedule(()->{ for(int i=0;i<100;i++){ final int a=i; timeWheelService.schedule(()-> System.out.println("^^^^^^buff-"+a),100,80); } },100,0); timeWheelService.schedule(()->{ for(int i=0;i<100;i++){ final int a=i; timeWheelService.schedule(()-> System.out.println("=====>debuff-"+a),100,100); } },100,0); timeWheelService.schedule(()-> System.out.println(new Date()),10,1000); } private MultiTimeWheel timeWheel=new MultiTimeWheel(); private TimeWheelThread timeWheelThread=new TimeWheelThread(); //每輪的時間輪長度 private static final int TICK=10; private static final int wheelIndexMillisecondLength=1000/TICK; private static final int wheelIndexSecondLength=60; private static final int wheelIndexMinuteLength=60; private static final int wheelIndexHourLength=24; private static final int wheelIndexDayLength=365; //每一輪對應的所有ticks private static final long wheelMillisecondAllTicks=1L; //1s 10格 private static final long wheelSecondAllTicks=wheelMillisecondAllTicks*wheelIndexMillisecondLength; //1min 600 private static final long wheelMinuteAllTicks=wheelSecondAllTicks*wheelIndexSecondLength; //1h private static final long wheelHourAllTicks=wheelMinuteAllTicks*wheelIndexMinuteLength; //1day private static final long wheelDayAllTicks=wheelHourAllTicks*wheelIndexHourLength; //每一輪當前的索引,可以精確獲取時間 private AtomicInteger wheelIndexMillisecond=new AtomicInteger(0); private AtomicInteger wheelIndexSecond=new AtomicInteger(0); private AtomicInteger wheelIndexMinute=new AtomicInteger(0); private AtomicInteger wheelIndexHour=new AtomicInteger(0); private AtomicInteger wheelIndexDay=new AtomicInteger(0); //實際儲存 private volatile Vector[] wheelMillisecond=new Vector[wheelIndexMillisecondLength]; private volatile Vector[] wheelSecond=new Vector[wheelIndexSecondLength]; private volatile Vector[] wheelMinute=new Vector[wheelIndexMinuteLength]; private volatile Vector[] wheelHour=new Vector[wheelIndexHourLength]; private volatile Vector[] wheelDay =new Vector[wheelIndexDayLength]; public void schedule(Runnable runnable,long delay,long period){ if(period<TICK && period>0) throw new RuntimeException("不能使得間隔週期小於時間片TICK:"+TICK+" ms,間隔週期可以為0ms"); synchronized(this){ TimeWheelTask timeWheelTask = new TimeWheelTask(delay, period, runnable); schedule(timeWheelTask); } } public void schedule(TimeWheelTask timeWheelTask){ //delay 加上當前相對於具體時間單位的餘數。 //處理當前是0分59s時加入了1分1秒後任務,會導致在1分1秒時候執行,因此如果延遲本身大於當前的一輪週期,則用延遲加上當前時間與本輪毫秒值的餘數 //00:00:59 + 61 = 00:02:00,可知,需要先加上本輪餘數 timeWheelTask.delay=timeWheelTask.delay+timeWheel.getWheelNowTime(timeWheelTask.delay); timeWheel.addTaskToWheel(timeWheelTask.delay,timeWheelTask); } //真正執行定時任務的執行緒池 private ThreadPoolExecutor threadPoolExecutor; public TimeWheelService(int coreSize){ this.threadPoolExecutor=new ThreadPoolExecutor(coreSize,coreSize,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10000)); timeWheelThread.start(); } //輪子 class MultiTimeWheel{ /** * 增加 one-tick,可能會觸發每層輪,時間輪的升級 */ public void incrTick() { if(incIndex(TimeUnit.MILLISECONDS)){ return; } if(incIndex(TimeUnit.SECONDS)){ return; } if(incIndex(TimeUnit.MINUTES)){ return; } if(incIndex(TimeUnit.HOURS)){ return; } incIndex(TimeUnit.DAYS); } //增加一個tick,處理因為升級導致的新事件新增 private boolean incIndex(TimeUnit timeUnit){ long allTicksNext; Vector[] vectorsNext; AtomicInteger index; AtomicInteger indexNext; int wheelLength; int wheelLengthNext; switch(timeUnit){ case DAYS: allTicksNext=0; vectorsNext=null; index=wheelIndexDay; indexNext=null; wheelLength=wheelIndexDayLength; wheelLengthNext=0; break; case HOURS: allTicksNext=wheelDayAllTicks; vectorsNext=wheelDay; index=wheelIndexHour; indexNext=wheelIndexDay; wheelLength=wheelIndexHourLength; wheelLengthNext=wheelIndexDayLength; break; case MINUTES: allTicksNext=wheelHourAllTicks; vectorsNext=wheelHour; index=wheelIndexMinute; indexNext=wheelIndexHour; wheelLength=wheelIndexMinuteLength; wheelLengthNext=wheelIndexHourLength; break; case SECONDS: allTicksNext=wheelMinuteAllTicks; vectorsNext=wheelMinute; index=wheelIndexSecond; indexNext=wheelIndexMinute; wheelLength=wheelIndexSecondLength; wheelLengthNext=wheelIndexMinuteLength; break; case MILLISECONDS: allTicksNext=wheelSecondAllTicks; vectorsNext=wheelSecond; index=wheelIndexMillisecond; indexNext=wheelIndexSecond; wheelLength=wheelIndexMillisecondLength; wheelLengthNext=wheelIndexSecondLength; break; default: throw new RuntimeException("Timeunit 引數錯誤"); } index.getAndIncrement(); if(index.get()<wheelLength){ return true; } index.set(index.get()%wheelLength); //如果是天數,因為當處理hours時候已經處理過天了,所以直接返回。 if(timeUnit.equals(TimeUnit.DAYS)){ return true; } //獲取下一個時間輪的任務,並新增 List<TimeWheelTask> taskList = vectorsNext[(indexNext.get() + 1) % wheelLengthNext]; if(null!=taskList){ for(TimeWheelTask task:taskList){ addTaskToWheel(task.delay%(allTicksNext),task); } taskList.clear(); } return false; } public List<TimeWheelTask> getTaskList() { return wheelMillisecond[wheelIndexMillisecond.get()]; } //加入時間輪,判斷是否需要升級 void addTaskToWheel(long delay,TimeWheelTask task){ if(delay>=wheelIndexDayLength*wheelDayAllTicks*TICK){ throw new RuntimeException("不能超過一年"); } if(addTaskToWheel(delay,task,TimeUnit.DAYS)){ return; } if(addTaskToWheel(delay,task,TimeUnit.HOURS)){ return; } if(addTaskToWheel(delay,task,TimeUnit.MINUTES)){ return; } if(addTaskToWheel(delay,task,TimeUnit.SECONDS)){ return; } addTaskToWheel(delay,task,TimeUnit.MILLISECONDS); } //新增任務到時間輪, private boolean addTaskToWheel(long delay, TimeWheelTask timeWheelTask, TimeUnit timeUnit){ long allTicks; Vector[] vectors; AtomicInteger index; int wheelLength; switch (timeUnit){ case DAYS: allTicks=wheelDayAllTicks; vectors= wheelDay; index=wheelIndexDay; wheelLength=wheelIndexDayLength; break; case HOURS: allTicks=wheelHourAllTicks; vectors=wheelHour; index=wheelIndexHour; wheelLength=wheelIndexHourLength; break; case MINUTES: allTicks=wheelMinuteAllTicks; vectors=wheelMinute; index=wheelIndexMinute; wheelLength=wheelIndexMinuteLength; break; case SECONDS: allTicks=wheelSecondAllTicks; vectors=wheelSecond; index=wheelIndexSecond; wheelLength=wheelIndexSecondLength; break; case MILLISECONDS: allTicks=wheelMillisecondAllTicks; vectors=wheelMillisecond; index=wheelIndexMillisecond; wheelLength=wheelIndexMillisecondLength; break; default: throw new RuntimeException("timeUnit 引數錯誤"); } //新增到當前的索引 if(0!=delay/(allTicks*TICK) || timeUnit.equals(TimeUnit.MILLISECONDS)){ int indexNew=(index.get()+(int)(delay/(allTicks*TICK)))%wheelLength; if(null==vectors[indexNew]){ vectors[indexNew]=new Vector(); } vectors[indexNew].add(timeWheelTask); return true; } return false; } //準確獲取當前需要新增的準確時間輪 long getWheelNowTime(long delay){ //當前時間 毫秒 long timeFromWheelStart=(wheelIndexDay.get()*wheelDayAllTicks+wheelIndexHour.get()*wheelHourAllTicks+wheelIndexMinute.get()*wheelMinuteAllTicks +wheelIndexSecond.get()*wheelSecondAllTicks+wheelIndexMillisecond.get()*wheelMillisecondAllTicks)*TICK; //從大到小開始處理,是否大於1天 if(0!=delay/(wheelDayAllTicks*TICK)){ return timeFromWheelStart%(wheelDayAllTicks*TICK); } //大於1小時 if(0!=delay/(wheelHourAllTicks*TICK)){ return timeFromWheelStart%(wheelHourAllTicks*TICK); } //大於1分鐘 if(0!=delay/(wheelMinuteAllTicks*TICK)){ return timeFromWheelStart%(wheelMinuteAllTicks*TICK); } //大於1秒 if(0!=delay/(wheelSecondAllTicks*TICK)){ return timeFromWheelStart%(wheelSecondAllTicks*TICK); } return 0; } } /** * 時間輪的task */ class TimeWheelTask implements Runnable{ private long delay; //-1 為手動取消、0為1次定時、大於0表示正常排程間隔 private long period; private Runnable runnable; public void setDelay(long delay) { this.delay = delay; } TimeWheelTask(long delay, long period, Runnable runnable){ this.delay=delay; this.period=period; this.runnable=runnable; } /** * 判斷是否是週期性的排程任務 * @return */ public boolean isPeriodSchedule(){ return period>0; } @Override public void run() { if(this.period==-1){ return; } runnable.run(); } /** * 手動消除排程 */ public void cancel(){ this.period=-1; } } //時間輪 main執行緒 class TimeWheelThread extends Thread{ public TimeWheelThread(){ super("TimeWheel_main"); } public TimeWheelThread(String thread_name){ super(thread_name); } @Override public void run() { try { mainLoop(); }catch (Exception e){ System.out.println(e); }finally { System.out.println(111); } } private void mainLoop() { while (true){ //執行任務 runTask(timeWheel.getTaskList()); //時間增加 one-tick timeWheel.incrTick(); //休眠 try { TimeUnit.MILLISECONDS.sleep(TICK); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } } } private void runTask(List<TimeWheelTask> taskList) { if(taskList==null || taskList.size()==0) return; for(TimeWheelTask timeWheelTask:taskList){ threadPoolExecutor.execute(timeWheelTask); if(timeWheelTask.isPeriodSchedule()){ timeWheelTask.setDelay(timeWheelTask.period); schedule(timeWheelTask); } } taskList.clear(); } } }
使用ScheduledThreadPoll
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println(new Date()); }, 10, 20, TimeUnit.SECONDS);