輕量級分散式延時任務處理元件easyTask-L-入門篇
今天給大家介紹一款新武器。我自研的一個java元件easyTask-L。這個是做啥的呢?我之前研發了一款單機版本的easyTask,這次是要介紹另外一款easyTask-L。區別就是後者支援分散式環境,任務資料支援多個備份,具備了真正意義上的高可用。同時它又是輕量級的分散式應用,原因是因為它還不是一個獨立的中介軟體,它需要一個宿主程式才能使用。做成獨立的中介軟體是我後面要繼續做的一個版本。
元件開源地址:https://github.com/liuche51/easyTask-L
廢話不多說,先來介紹下easyTask-L元件的特性。
高可用:因為我們是分散式leader-follow叢集,每個任務多有多個備份資料,所以可靠性非常高
秒級觸發:我們是採用時鐘秒級分片的資料結構,支援秒級觸發任務。不早也不遲
分散式:元件支援分散式
高併發:支援多執行緒同時提交任務,支援多執行緒同時執行任務
資料一致性:使用TCC事務機制,保障資料在叢集中的強一致性
海量任務:節點可以儲存非常多的任務,只要記憶體和磁碟足夠。觸發效率也是極高。需要配置好分派任務執行緒池和執行任務執行緒池大小即可
開源:元件完全在GitHub上開源。任何人都可以隨意使用,在不侵犯著作權情況下
易使用:無需獨立部署叢集,嵌入式開發。不過多的依賴於第三方中介軟體,除了zookeeper。
easyTask-L元件的整體架構如下:
整體採用分散式設計,leader-follow風格。叢集中每一個節點都是leader,同時也可能是其他某個節點的follow。每個leader都有若干個follow。leader上提交的新任務都會強制同步到follow中,刪除任務同時也會強制刪除follow中的備份任務。叢集中所有節點都會在zookeeper中註冊並維持心跳。
easyTask-L元件的核心“環形佇列”的設計架構如下:
環形佇列在之前單機版的easyTask中也講過,原理都是類似的。客戶端提交任務,服務端先將任務進行持久化,再新增上環形佇列這個資料結構中去,等待時間片輪詢的到來。不同的是這裡的持久化機制,改成了分散式儲存了。不僅leader自己儲存起來,還要同步儲存到其follow中去。刪除一個任務也是類似的過程。
任務新增時會計算其觸發所屬的時間分片槽,等環形佇列的始終秒針到達時會判斷任務是否可以被執行了。如果可以執行了,則分派任務執行緒池將其丟入執行任務執行緒池等待執行。只要執行任務執行緒池執行緒數足夠,任務將立即得到執行。
大概的原理清晰了,接下來就是寫個HelloWorld程式了!
easyTask-L不是一箇中間件,所以需要一個宿主程式。建議在微服務框架如:dubbo、spring-cloud中使用此元件,並建立一個獨立的專門用於處理延時任務的服務模組。這樣可以使服務儘可能少的頻繁更新重啟。保持叢集的穩定性。下面我將以一個springboot應用為例來給大家演示如何使用easyTask-L元件
第一步:引入jar包
如果你是Maven專案,可以使用如下方式配置引入jar包。這可以讓專案自動引入easyTask-L中依賴的其他第三方jar包。最新版本請在maven中央倉庫中查詢。請在pom.xml中加入以下引用
<dependency> <groupId>com.github.liuche51</groupId> <artifactId>easyTask-L</artifactId> <version>1.0.1</version> </dependency>
第二步:配置啟動環形佇列
這裡以springboot應用為例,在application.yml中做如下配置
server: port: 8081 spring: application: name: easyTask-L easyTaskL: zkAddress: 127.0.0.1:2181 taskStorePath: C:/db/node1 serverPort: 2021 sQLlitePoolSize: 5 backupCount: 2 dispatchPool: corePoolSize: 5 maximumPoolSize: 50 workPool: corePoolSize: 5 maximumPoolSize: 50
新建一個啟動配置類EasyTaskLConf.java
1 package com.github.liuche51.easyTaskL.config; 2 3 import com.github.liuche51.easyTask.core.AnnularQueue; 4 import com.github.liuche51.easyTask.core.EasyTaskConfig; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.beans.factory.annotation.Value; 8 import org.springframework.context.annotation.Bean; 9 import org.springframework.context.annotation.Configuration; 10 11 import java.util.concurrent.LinkedBlockingQueue; 12 import java.util.concurrent.ThreadPoolExecutor; 13 14 @Configuration 15 public class EasyTaskLConf { 16 private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class); 17 @Value("${easyTaskL.zkAddress}") 18 private String zkAddress; 19 @Value("${easyTaskL.taskStorePath}") 20 private String taskStorePath; 21 @Value("${easyTaskL.serverPort}") 22 private int serverPort; 23 @Value("${easyTaskL.sQLlitePoolSize}") 24 private int sQLlitePoolSize; 25 @Value("${easyTaskL.backupCount}") 26 private int backupCount; 27 @Value("${easyTaskL.dispatchPool.corePoolSize}") 28 private int dispatchCorePoolSize; 29 @Value("${easyTaskL.dispatchPool.maximumPoolSize}") 30 private int dispatchMaximumPoolSize; 31 @Value("${easyTaskL.workPool.corePoolSize}") 32 private int workPoolCorePoolSize; 33 @Value("${easyTaskL.workPool.maximumPoolSize}") 34 private int workPoolMaximumPoolSize; 35 @Bean 36 public AnnularQueue initAnnularQueue(){ 37 try { 38 EasyTaskConfig config =new EasyTaskConfig(); 39 config.setTaskStorePath(taskStorePath); 40 config.setServerPort(serverPort); 41 config.setSQLlitePoolSize(sQLlitePoolSize); 42 //config.setBackupCount(backupCount); 43 config.setZkAddress(zkAddress); 44 AnnularQueue annularQueue = AnnularQueue.getInstance(); 45 config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS, 46 new LinkedBlockingQueue<Runnable>())); 47 config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS, 48 new LinkedBlockingQueue<Runnable>())); 49 annularQueue.start(config); 50 return annularQueue; 51 }catch (Exception e){ 52 log.error("",e); 53 return null; 54 } 55 } 56 57 }EasyTaskLConf.java
第三步:建立延時任務處理類
package com.github.liuche51.easyTaskL.task; import com.github.liuche51.easyTask.dto.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; public class CusTask1 extends Task implements Runnable { private static Logger log = LoggerFactory.getLogger(CusTask1.class); @Override public void run() { Map<String, String> param = getParam(); if (param != null && param.size() > 0) { log.info("任務1已執行!姓名:{} 生日:{} 年齡:{} 執行緒ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid")); } } }
第四步:向環形佇列中新增任務
新建一個Controller,增加以下Action方法。
@RequestMapping("/once") @ResponseBody public String once(@RequestParam("name") String name, @RequestParam("time") int time) { CusTask1 task1 = new CusTask1(); task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli()); Map<String, String> param = new HashMap<String, String>() { { put("name", name); put("birthday", "1996-1-1"); put("age", "28"); put("threadid", String.valueOf(Thread.currentThread().getId())); } }; task1.setParam(param); return AnnularQueue.getInstance().submitAllowWait(task1); }
完整的demo可以使用Git克隆我的一個開源專案:https://gitee.com/liuche/DubboServer.git 找到子專案easyTask-L-demo