1. 程式人生 > 實用技巧 >關於動態定時任務的解決方案。

關於動態定時任務的解決方案。

之前做定時任務都是用Scheduled註解來實現,如果需要動態的配置,則不能滿足這種需求。查詢資料知道這種可以用時間輪演算法來實現。大概就是模擬時間表盤來做任務。

具體java實現:

  1 package com.education.task.provider;
  2 
  3 import org.slf4j.Logger;
  4 import org.slf4j.LoggerFactory;
  5 
  6 import java.util.HashSet;
  7 import java.util.Map;
  8 import java.util.Set;
  9 import
java.util.concurrent.ConcurrentHashMap; 10 import java.util.concurrent.ExecutorService; 11 import java.util.concurrent.TimeUnit; 12 import java.util.concurrent.atomic.AtomicBoolean; 13 import java.util.concurrent.atomic.AtomicInteger; 14 import java.util.concurrent.locks.Condition; 15 import java.util.concurrent.locks.Lock;
16 import java.util.concurrent.locks.ReentrantLock; 17 18 /** 19 * @author pengbenlei 20 * @company leenleda 21 * @date 2020/12/21 9:53 22 * @description 時間輪演算法排程任務 23 */ 24 public class RingBufferWheel { 25 26 private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class); 27
/** 28 * default ring buffer size 29 */ 30 private static final int STATIC_RING_SIZE = 60; 31 32 private Object[] ringBuffer; 33 34 private int bufferSize; 35 36 /** 37 * business thread pool 38 */ 39 private ExecutorService executorService; 40 41 private volatile int size = 0; 42 43 /*** 44 * task stop sign 45 */ 46 private volatile boolean stop = false; 47 48 /** 49 * task start sign 50 */ 51 private volatile AtomicBoolean start = new AtomicBoolean(false); 52 53 /** 54 * total tick times 55 */ 56 private AtomicInteger tick = new AtomicInteger(); 57 58 private Lock lock = new ReentrantLock(); 59 private Condition condition = lock.newCondition(); 60 61 private AtomicInteger taskId = new AtomicInteger(); 62 private Map<Integer, BusinessTask> taskMap = new ConcurrentHashMap<>(16); 63 64 /** 65 * Create a new delay task ring buffer by default size 66 * 67 * @param executorService the business thread pool 68 */ 69 public RingBufferWheel(ExecutorService executorService) { 70 this.executorService = executorService; 71 this.bufferSize = STATIC_RING_SIZE; 72 this.ringBuffer = new Object[bufferSize]; 73 } 74 75 76 /** 77 * Create a new delay task ring buffer by custom buffer size 78 * 79 * @param executorService the business thread pool 80 * @param bufferSize custom buffer size 81 */ 82 public RingBufferWheel(ExecutorService executorService, int bufferSize) { 83 this(executorService); 84 85 if (!powerOf2(bufferSize)) { 86 throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2"); 87 } 88 this.bufferSize = bufferSize; 89 this.ringBuffer = new Object[bufferSize]; 90 } 91 92 /** 93 * Add a task into the ring buffer(thread safe) 94 * 95 * @param task business task extends {@link BusinessTask} 96 */ 97 public int addTask(BusinessTask task) { 98 int key = task.getKey(); 99 int id; 100 101 try { 102 lock.lock(); 103 int index = mod(key, bufferSize); 104 task.setIndex(index); 105 Set<BusinessTask> tasks = get(index); 106 107 int cycleNum = cycleNum(key, bufferSize); 108 if (tasks != null) { 109 task.setCycleNum(cycleNum); 110 tasks.add(task); 111 } else { 112 task.setIndex(index); 113 task.setCycleNum(cycleNum); 114 Set<BusinessTask> sets = new HashSet<>(); 115 sets.add(task); 116 put(key, sets); 117 } 118 id = taskId.incrementAndGet(); 119 task.setTaskId(id); 120 taskMap.put(id, task); 121 size++; 122 } finally { 123 lock.unlock(); 124 } 125 126 start(); 127 128 return id; 129 } 130 131 132 /** 133 * Cancel task by taskId 134 * 135 * @param id unique id through {@link #addTask(BusinessTask)} 136 * @return 137 */ 138 public boolean cancel(int id) { 139 140 boolean flag = false; 141 Set<BusinessTask> tempTask = new HashSet<>(); 142 143 try { 144 lock.lock(); 145 BusinessTask task = taskMap.get(id); 146 if (task == null) { 147 return false; 148 } 149 150 Set<BusinessTask> tasks = get(task.getIndex()); 151 for (BusinessTask tk : tasks) { 152 if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) { 153 size--; 154 flag = true; 155 taskMap.remove(id); 156 } else { 157 tempTask.add(tk); 158 } 159 160 } 161 //update origin data 162 ringBuffer[task.getIndex()] = tempTask; 163 } finally { 164 lock.unlock(); 165 } 166 167 return flag; 168 } 169 170 /** 171 * Thread safe 172 * 173 * @return the size of ring buffer 174 */ 175 public int taskSize() { 176 return size; 177 } 178 179 /** 180 * Same with method {@link #taskSize} 181 * 182 * @return 183 */ 184 public int taskMapSize() { 185 return taskMap.size(); 186 } 187 188 /** 189 * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop} 190 */ 191 public void start() { 192 if (!start.get()) { 193 194 if (start.compareAndSet(start.get(), true)) { 195 logger.info("Delay task is starting"); 196 Thread job = new Thread(new TriggerJob()); 197 job.setName("consumer RingBuffer thread"); 198 job.start(); 199 start.set(true); 200 } 201 202 } 203 } 204 205 /** 206 * Stop consumer ring buffer thread 207 * 208 * @param force True will force close consumer thread and discard all pending tasks 209 * otherwise the consumer thread waits for all tasks to completes before closing. 210 */ 211 public void stop(boolean force) { 212 if (force) { 213 logger.info("Delay task is forced stop"); 214 stop = true; 215 executorService.shutdownNow(); 216 } else { 217 logger.info("Delay task is stopping"); 218 if (taskSize() > 0) { 219 try { 220 lock.lock(); 221 condition.await(); 222 stop = true; 223 } catch (InterruptedException e) { 224 logger.error("InterruptedException", e); 225 } finally { 226 lock.unlock(); 227 } 228 } 229 executorService.shutdown(); 230 } 231 232 233 } 234 235 236 private Set<BusinessTask> get(int index) { 237 return (Set<BusinessTask>) ringBuffer[index]; 238 } 239 240 private void put(int key, Set<BusinessTask> tasks) { 241 int index = mod(key, bufferSize); 242 ringBuffer[index] = tasks; 243 } 244 245 /** 246 * Remove and get task list. 247 * 248 * @param key 249 * @return task list 250 */ 251 private Set<BusinessTask> remove(int key) { 252 Set<BusinessTask> tempTask = new HashSet<>(); 253 Set<BusinessTask> result = new HashSet<>(); 254 255 Set<BusinessTask> tasks = (Set<BusinessTask>) ringBuffer[key]; 256 if (tasks == null) { 257 return result; 258 } 259 260 for (BusinessTask task : tasks) { 261 if (task.getCycleNum() == 0) { 262 result.add(task); 263 264 size2Notify(); 265 } else { 266 // decrement 1 cycle number and update origin data 267 task.setCycleNum(task.getCycleNum() - 1); 268 tempTask.add(task); 269 } 270 // remove task, and free the memory. 271 taskMap.remove(task.getTaskId()); 272 } 273 274 //update origin data 275 ringBuffer[key] = tempTask; 276 277 return result; 278 } 279 280 private void size2Notify() { 281 try { 282 lock.lock(); 283 size--; 284 if (size == 0) { 285 condition.signal(); 286 } 287 } finally { 288 lock.unlock(); 289 } 290 } 291 292 private boolean powerOf2(int target) { 293 if (target < 0) { 294 return false; 295 } 296 int value = target & (target - 1); 297 if (value != 0) { 298 return false; 299 } 300 301 return true; 302 } 303 304 private int mod(int target, int mod) { 305 // equals target % mod 306 target = target + tick.get(); 307 return target & (mod - 1); 308 } 309 310 private int cycleNum(int target, int mod) { 311 //equals target/mod 312 return target >> Integer.bitCount(mod - 1); 313 } 314 315 316 317 private class TriggerJob implements Runnable { 318 319 @Override 320 public void run() { 321 int index = 0; 322 while (!stop) { 323 try { 324 System.out.println(index); 325 Set<BusinessTask> tasks = remove(index); 326 for (BusinessTask task : tasks) { 327 executorService.submit(task); 328 } 329 330 if (++index > bufferSize - 1) { 331 index = 0; 332 } 333 334 //Total tick number of records 335 tick.incrementAndGet(); 336 TimeUnit.SECONDS.sleep(1); 337 338 } catch (Exception e) { 339 logger.error("Exception", e); 340 } 341 342 } 343 344 logger.info("Delay task has stopped"); 345 } 346 } 347 }
RingBufferWheel
 1 package com.education.task.provider;
 2 
 3 import lombok.Getter;
 4 import lombok.Setter;
 5 
 6 /**
 7  * @author pengbenlei
 8  * @company leenleda
 9  * @date 2020/12/21 10:49
10  * @description
11  */
12 @Getter
13 @Setter
14 public abstract class BusinessTask extends Thread {
15 
16     /**
17      * 所在位置
18      */
19     private int index;
20 
21     /**
22      * cycleNum 輪盤上的圈數
23      */
24     private int cycleNum;
25 
26     /**
27      * 輪盤的刻度
28      */
29     private int key;
30 
31     /**
32      * The unique ID of the task
33      */
34     private int taskId;
35 
36     @Override
37     public void run() {
38     }
39 
40 }
BusinessTask

測試呼叫:

1     public static class Job extends BusinessTask{
2         @Override
3         public void run() {
4             System.out.println("12346");
5         }
6     }
構造任務
1 SpringApplication.run(TaskProviderApplication.class, args);
2         RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
3         for (int i = 0; i < 2; i++) {
4             BusinessTask job = new Job();
5             job.setKey(10);
6             job.setCycleNum(i);
7             ringBufferWheel.addTask(job);
8         }
呼叫程式碼