.Net core 下Quartz.Net + mysql 高可用模式
當下定時服務元件,相信很多大佬都很熟悉了 ,比如hangfire、quartz.net、以及xxljob,公司級別個人感覺還是xxljob最好用,開源簡單,還有叢集,故障恢復,定時準確,對比其他定時元件有很大的優勢,但總有它實現不了的功能......
之前做的定時業務中,都是使用xxljob來實現定時的,非常方便快捷,使用cron表示式就可以實現定時或者迴圈定時的功能。
這個時候就有一個比較尷尬的問題了,定時的時間是不規則的,比如每47秒執行一次,這個時候使用cron表示式就無法準確的定時了,就會出現這種場面:
可以看出來假設我們設定的是每47秒執行一次,會出現每00秒也執行一次,達不到業務的要求,所以這個時候在想其他方式解決這個問題,最終還是選擇了quartz.net 。
回到正題,使用quartz.net + mysql 首先需要建立資料庫表:
官方建表的指令碼:
https://github.com/quartznet/quartznet/blob/main/database/tables/tables_mysql_innodb.sql
通過官方給的指令碼一鍵建表非常的方便,接下來我們就是引入Quartz.net 包,引入之後就是初始化了:
我們需要寫個初始化方法,這裡很重要,一定需要攜帶資料來源引數(這個坑踩了好久,不然啟動不了):
/// <summary> /// 初始化quartnet /// </summary> publicstatic class QuartzSchedulerFatory { public async static Task Init(string connection, long count) { try { //1.首先建立一個作業排程池 var properties = new NameValueCollection(); //schedule名稱 properties["quartz.scheduler.instanceName"] = "MyClusteredScheduler"; properties["quartz.scheduler.instanceId"] = "AUTO"; properties["quartz.threadPool.type"] = "Quartz.Simpl.DefaultThreadPool, Quartz"; properties["quartz.threadPool.threadCount"] = $"{count}"; properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX,Quartz"; //表明字首 properties["quartz.jobStore.tablePrefix"] = "QRTZ_"; // 序列化型別,必須新增,不新增無法註冊資料庫 properties["quartz.serializer.type"] = "binary"; //驅動型別 properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate,Quartz"; //資料來源名稱 properties["quartz.jobStore.dataSource"] = "myDS"; properties["quartz.jobStore.clustered"] = "true"; properties["quartz.jobStore.clusterCheckinInterval"] = "20000"; //連線字串 properties["quartz.dataSource.myDS.connectionString"] = connection; //版本 properties["quartz.dataSource.myDS.provider"] = "MySql"; //最大連結數 properties["quartz.dataSource.myDS.maxConnections"] = "100"; // First we must get a reference to a scheduler var schedulerFactory = new StdSchedulerFactory(properties); var scheduler = await schedulerFactory.GetScheduler(); scheduler.Start(); } catch (Exception ex) { Log.Logger.Error($"{DateTime.UtcNow}: 初始化quartz.net異常:{ex.Message}"); } } }
然後在startup類中的ConfigureServices方法中注入:
QuartzSchedulerFatory.Init(_configuration.GetValue<string>("Mysql:Default"), _configuration.GetValue<long>("JobMaxCount"));
然後就是寫我們執行的方法了:
/// <summary> /// Quartz 執行類 /// </summary> public class JobExcuteService : IJob { /// <summary> /// 觸發器觸發之後執行的方法 /// </summary> /// <param name="context"></param> /// <returns></returns> public async Task Execute(IJobExecutionContext context) { var id = Convert.ToInt64(context.JobDetail.Key.Name); Log.Logger.Information($"{id} 任務執行,執行時間:" + DateTime.Now.ToString()); // 依賴注入 var provider = ServiceLocator.ServiceProvider; var mongo = provider.GetService<IMongoService>(); // 獲取例項 } }
這裡需要注意的一點,因為執行類是不支援構造方法中注入例項的,所以我們需要寫個獲取例項的方法:
public static class ServiceLocator { public static IServiceProvider ServiceProvider { get; private set; } public static void SetService(IServiceProvider service) { ServiceProvider = service; } }
然後需要在startup注入IServiceProvider 物件:
ServiceLocator.SetService(service: services.BuildServiceProvider());
這樣就解決了無法構造注入的問題了。寫好執行類之後就需要編寫建立類:
/// <summary> /// Quartzjob操作類 /// </summary> public static class QuartzJobHelper { /// <summary> /// job 組 /// </summary> public static readonly string group = "Operation_BroadCast"; /// <summary> /// 建立任務 /// </summary> /// <param name="name"></param> /// <param name="startTime"></param> /// <param name="endTime"></param> /// <param name="frequency"></param> public static async Task<bool> CreateJob(string name, long startTime, long endTime, int frequency) { var result = false; try { Log.Logger.Information($"{DateTime.UtcNow}: 新建定時任務({name})開始---------------------- \n"); //IContainer container = JobModule.ConfigureContainer(new ContainerBuilder()).Build(); ISchedulerFactory schedulerFactory = new StdSchedulerFactory(); IScheduler scheduler = await schedulerFactory.GetScheduler("MyClusteredScheduler"); //IScheduler scheduler = (IScheduler)StdSchedulerFactory.GetDefaultScheduler().Result; IJobDetail job = JobBuilder.Create<JobExcuteService>() .WithIdentity(name, group) .Build(); long delayTime = endTime + (frequency * 1000 * 3) + 1000; var end = DateTimeOffset.FromUnixTimeMilliseconds(delayTime); ITrigger trigger; if (startTime != 0) { var start = DateTimeOffset.FromUnixTimeMilliseconds(startTime); trigger = TriggerBuilder.Create() .WithIdentity(name, group) .StartAt(start) .EndAt(end) .WithSimpleSchedule(s => s .WithIntervalInSeconds(frequency) .RepeatForever()) .Build(); } else { trigger = TriggerBuilder.Create() .WithIdentity(name, group) .StartNow() .EndAt(end) .WithSimpleSchedule(s => s .WithIntervalInSeconds(frequency) .RepeatForever()) .Build(); } //var cts = new CancellationTokenSource(); //var scheduler = container.Resolve<IScheduler>(); await scheduler.ScheduleJob(job, trigger); await scheduler.Start(); Log.Logger.Information($"\n {DateTime.UtcNow}: 新建定時任務({name}) 成功---------------------- \n"); result = true; } catch (Exception ex) { Log.Logger.Error($"{DateTime.UtcNow}: 新建定時任務異常:{ex.Message}"); } return result; }
然後就可以根據業務場景進行建立任務刪除任務,達到我們的目的,但是官方宣稱使用mysql或者其他資料庫持久化的時候,是有可能出現死鎖的情況,儘管出現的概率不高,所以需要監控使用,有問題及時處理。
如有錯誤,歡迎指正,互相學習。謝謝!