1. 程式人生 > >Quartz.Net使用教程

Quartz.Net使用教程

在專案的開發過程中,難免會遇見後需要後臺處理的任務,例如定時傳送郵件通知、後臺處理耗時的資料處理等,這個時候你就需要Quartz.Net了。

Quartz.Net是純淨的,它是一個.Net程式集,是非常流行的Java作業排程系統Quartz的C#實現。

Quartz.Net一款功能齊全的任務排程系統,從小型應用到大型企業級系統都能適用。功能齊全體現在觸發器的多樣性上面,即支援簡單的定時器,也支援Cron表示式;即能執行重複的作業任務,也支援指定例外的日曆;任務也可以是多樣性的,只要繼承IJob介面即可。

對於小型應用,Quartz.Net可以整合到你的系統中,對於企業級系統,它提供了Routing支援,提供了Group來組織和管理任務,此外還有持久化、外掛功能、負載均衡和故障遷移等滿足不同應用場景的需要。

Hello Quartz.Net

開始使用一個框架,和學習一門開發語言一樣,最好是從Hello World程式開始。

首先建立一個示例程式,然後新增Quartz.Net的引用。

Install-Package Quartz -Version 3.0.7

我們使用的是當前最新版本3.0.7進行演示。新增引用以後,來建立一個Job類HelloQuartzJob

public class HelloQuartzJob : IJob
{
    public Task Execute(IJobExecutionContext context)
    {
        return Task.Factory.StartNew(() =>
        {
            Console.WriteLine("Hello Quartz.Net");
        });
    }
}

這是個非常簡單的Job類,它在執行時輸出文字Hello Quartz.Net

接下來,我們在程式啟動時建立排程器(Scheduler),並新增HelloQuartzJob的排程:

static async Task MainAsync()
{
    var schedulerFactory = new StdSchedulerFactory();
    var scheduler = await schedulerFactory.GetScheduler();
    await scheduler.Start();
    Console.WriteLine($"任務排程器已啟動");

    //建立作業和觸發器
    var jobDetail = JobBuilder.Create<HelloQuartzJob>().Build();
    var trigger = TriggerBuilder.Create()
                                .WithSimpleSchedule(m => {
                                    m.WithRepeatCount(3).WithIntervalInSeconds(1);
                                })
                                .Build();

    //新增排程
    await scheduler.ScheduleJob(jobDetail, trigger);
}

然後執行程式,你會看到如下圖:

通過演示可以看出,要執行一個定時任務,一般需要四步:

  1. 建立任務排程器。排程器通常在應用程式啟動時建立,一個應用程式例項通常只需要一個排程器即可。
  2. 建立Job和JobDetail。Job是作業的型別,描述了作業是如何執行的,這個類是由我們定義的;JobDetail是Quartz對作業的封裝,它包含Job型別,以及Job在執行時用到的資料,還包括是否要持久化、是否覆蓋已存在的作業等選項。
  3. 建立觸發器。觸發器描述了在何時執行作業。
  4. 新增排程。當完成以上三步以後,就可以對作業進行排程了。

作業:Job和JobDetail

Job是作業的型別,描述了作業是如何執行的,這個型別是由我們定義的,例如上文的HelloQuartzJob。Job實現IJob介面,而IJob介面只有一個Execute方法,引數context中包含了與當前上下文中關聯的Scheduler、JobDetail、Trigger等。

一個典型的Job定義如下:

public class HelloQuartzJob : IJob
{
    public Task Execute(IJobExecutionContext context)
    {
        return Task.Factory.StartNew(() =>
        {
            Console.WriteLine("Hello Quartz.Net");
        });
    }
}

JobData

Job不是孤立存在的,它需要執行的引數,這些引數如何傳遞進來呢?我們來定義一個Job類進行演示。

public class SayHelloJob : IJob
{
    public string UserName { get; set; }

    public Task Execute(IJobExecutionContext context)
    {
        return Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Hello {UserName}!");
        });
    }
}

SayHelloJob在執行時需要引數UserName,這個引數被稱為JobData,Quartz.Net通過JobDataMap的方式傳遞引數。程式碼如下:

//建立作業
var jobDetail = JobBuilder.Create<SayHelloJob>()
                            .SetJobData(new JobDataMap() {
                                new KeyValuePair<string, object>("UserName", "Tom")
                            })
                            .Build();

通過JobBuilder的SetJobData方法,傳入JobDataMap物件,JobDataMap物件中可以包含多個引數,這些引數可以對映到Job類的屬性上。我們完善程式碼執行示例,可以看到如下圖:

JobDetail

JobDetail是Quartz對作業的封裝,它包含Job型別,以及Job在執行時用到的資料,還包括是否孤立儲存、請求恢復作業等選項。

JobDetail是通過JobBuilder進行建立的。例如:

var jobDetail = JobBuilder.Create<SayHelloJob>()
                            .SetJobData(new JobDataMap() {
                                new KeyValuePair<string, object>("UserName", "Tom")
                            })
                            .StoreDurably(true)
                            .RequestRecovery(true)
                            .WithIdentity("SayHelloJob-Tom", "DemoGroup")
                            .WithDescription("Say hello to Tom job")
                            .Build();

引數說明:

  • SetJobData:設定JobData
  • StoreDurably:孤立儲存,指即使該JobDetail沒有關聯的Trigger,也會進行儲存
  • RequestRecovery:請求恢復,指應用崩潰後再次啟動,會重新執行該作業
  • WithIdentity:作業的唯一標識
  • WithDescription:作業的描述資訊

除此之外,Quartz.Net還支援兩個非常有用的特性:

  • DisallowConcurrentExecution:禁止並行執行,該特性是針對JobDetail生效的
  • PersistJobDataAfterExecution:在執行完成後持久化JobData,該特性是針對Job型別生效的,意味著所有使用該Job的JobDetail都會在執行完成後持久化JobData。

持久化JobData

我們來演示一下該PersistJobDataAfterExecution特性,在SayHelloJob中,我們新加一個欄位RunSuccess,記錄任務是否執行成功。

首先在SayHelloJob新增特性:

[PersistJobDataAfterExecution]
public class SayHelloJob : IJob { }

然後在建立JobDetail時新增JobData:

var jobDetail = JobBuilder.Create<SayHelloJob>()
                            .SetJobData(new JobDataMap() {
                                new KeyValuePair<string, object>("UserName", "Tom"),
                                new KeyValuePair<string, object>("RunSuccess", false)
                            })

在執行時Job時,更新RunSuccess的值:

public Task Execute(IJobExecutionContext context)
{
    return Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"Prev Run Success:{RunSuccess}");
        Console.WriteLine($"Hello {UserName}!");

        context.JobDetail.JobDataMap.Put("RunSuccess", true);
    });
}

接下來看一下執行效果:

觸發器:Trigger

Trigger是觸發器,用來定製執行作業。Trigger有兩種型別:SampleTrigger和CronTrigger,我們分別進行說明。

SampleTrigger

顧名思義,這是個簡單的觸發器,有以下特性:

  • 重複執行:WithRepeatCount()/RepeatForever()
  • 設定間隔時間:WithInterval()
  • 定時執行:StartAt()/StartNow()
  • 設定優先順序:WithPriority(),預設為5

需要注意:當Trigger到達StartAt指定的時間時會執行一次,這一次執行是不包含在WithRepeatCount中的。在我們上面的例子中可以看出,新增排程後會立即執行一次,然後重複三次,最終執行了四次。

CronTrigger

CronTrigger是通過Cron表示式來完成排程的。Cron表示式非常靈活,可以實現幾乎各種定時場景的需要。

關於Cron表示式,大家可以移步 Quartz Cron表示式

使用CronTrigger的示例如下:

var trigger = TriggerBuilder.Create()
                            .WithCronSchedule("*/1 * * * * ?")
                            .Build();

日曆:Calendar

Calendar可以與Trigger進行關聯,從Trigger中排出執行計劃。例如你只希望在工作日執行作業,那麼我們可以定義一個休息日的日曆,將它與Trigger關聯,從而排出休息日的執行計劃。

Calendar示例程式碼如下:

var calandar = new HolidayCalendar();
calandar.AddExcludedDate(DateTime.Today);

await scheduler.AddCalendar("holidayCalendar", calandar, false, false);

var trigger = TriggerBuilder.Create()
                        .WithCronSchedule("*/1 * * * * ?")
                        .ModifiedByCalendar("holidayCalendar")
                        .Build();

在這個示例中,我們建立了HolidayCalendar日曆,然後新增排除執行的日期。我們把今天新增到排除日期後,該Trigger今天將不會觸發。

監聽器:JobListeners/TriggerListeners/SchedulerListeners

監聽器是Quartz.Net的另外一個出色的功能,它允許我們編寫監聽器達到在執行時獲取作業狀態、處理作業資料等功能。

JobListener

JobListener可以監聽Job執行前、執行後、否決執行的事件。我們通過程式碼進行演示:

public class MyJobListener : IJobListener
{
    public string Name { get; } = nameof(MyJobListener);

    public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        //Job即將執行
        return Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Job: {context.JobDetail.Key} 即將執行");
        });
    }

    public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        return Task.Factory.StartNew(()=> {
            Console.WriteLine($"Job: {context.JobDetail.Key} 被否決執行");
        });
    }

    public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default)
    {
        //Job執行完成
        return Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Job: {context.JobDetail.Key} 執行完成");
        });
    }
}

定義完成後,將MyJobListener新增到Scheduler中:

scheduler.ListenerManager.AddJobListener(new MyJobListener(), GroupMatcher<JobKey>.AnyGroup());

然後我們再執行程式,就可以看到Listener被呼叫了:

通過圖片可以看到,JobToBeExecutedJobWasExecuted都被執行了,JobExecutionVetoed沒有執行,那麼如何觸發JobExecutionVetoed呢?請繼續閱讀TriggerListener的演示。

TriggerListener

TriggerListener可以監聽Trigger的執行情況,我們通過程式碼進行演示:

public class MyTriggerListener : ITriggerListener
{
    public string Name { get; } = nameof(MyTriggerListener);

    public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default)
    {
        return Task.CompletedTask;
    }

    public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        return Task.CompletedTask;
    }

    public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default)
    {
        return Task.CompletedTask;
    }

    public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(true);   //返回true表示否決Job繼續執行
    }
}

MyTriggerListener新增到Scheduler中:

scheduler.ListenerManager.AddTriggerListener(new MyTriggerListener(), GroupMatcher<TriggerKey>.AnyGroup());

執行程式碼可以看到如下效果:

從圖片中可以看到,JobListener中的JobExecutionVetoed被執行了。

SchedulerListener

ISchedulerListener提供了Job、Trigger管理的監聽,與排程程式相關的事件包括:新增作業/觸發器,刪除作業/觸發器,排程程式中的嚴重錯誤,排程程式關閉的通知等。完整的介面定義如下:

public interface ISchedulerListener
{
    Task JobAdded(IJobDetail jobDetail, CancellationToken cancellationToken = default);
    Task JobDeleted(JobKey jobKey, CancellationToken cancellationToken = default);
    Task JobInterrupted(JobKey jobKey, CancellationToken cancellationToken = default);
    Task JobPaused(JobKey jobKey, CancellationToken cancellationToken = default);
    Task JobResumed(JobKey jobKey, CancellationToken cancellationToken = default);
    Task JobScheduled(ITrigger trigger, CancellationToken cancellationToken = default);
    Task JobsPaused(string jobGroup, CancellationToken cancellationToken = default);
    Task JobsResumed(string jobGroup, CancellationToken cancellationToken = default);
    Task JobUnscheduled(TriggerKey triggerKey, CancellationToken cancellationToken = default);
    Task SchedulerError(string msg, SchedulerException cause, CancellationToken cancellationToken = default);
    Task SchedulerInStandbyMode(CancellationToken cancellationToken = default);
    Task SchedulerShutdown(CancellationToken cancellationToken = default);
    Task SchedulerShuttingdown(CancellationToken cancellationToken = default);
    Task SchedulerStarted(CancellationToken cancellationToken = default);
    Task SchedulerStarting(CancellationToken cancellationToken = default);
    Task SchedulingDataCleared(CancellationToken cancellationToken = default);
    Task TriggerFinalized(ITrigger trigger, CancellationToken cancellationToken = default);
    Task TriggerPaused(TriggerKey triggerKey, CancellationToken cancellationToken = default);
    Task TriggerResumed(TriggerKey triggerKey, CancellationToken cancellationToken = default);
    Task TriggersPaused(string triggerGroup, CancellationToken cancellationToken = default);
    Task TriggersResumed(string triggerGroup, CancellationToken cancellationToken = default);
}

新增SchedulerListener的程式碼如下:

scheduler.ListenerManager.AddSchedulerListener(mySchedListener);

持久化:JobStore

Quartz.Net支援Job的持久化操作,被稱為JobStore。預設情況下,Quartz將資料持久化到記憶體中,好處是記憶體的速度很快,壞處是無法提供負載均衡的支援,並且在程式崩潰後,我們將丟失所有Job資料,對於企業級系統來說,壞處明顯大於好處,因此有必要將資料儲存在資料庫中。

ADO.NET儲存

Quartz使用ADO.NET訪問資料庫,支援的資料庫廠商非常廣泛:

  • SqlServer - .NET Framework 2.0的SQL Server驅動程式
  • OracleODP - Oracle的Oracle驅動程式
  • OracleODPManaged - Oracle的Oracle 11託管驅動程式
  • MySql - MySQL Connector / .NET
  • SQLite - SQLite ADO.NET Provider
  • SQLite-Microsoft - Microsoft SQLite ADO.NET Provider
  • Firebird - Firebird ADO.NET提供程式
  • Npgsql - PostgreSQL Npgsql

資料庫的建立語句可以在Quartz.Net的原始碼中找到:https://github.com/quartznet/quartznet/tree/master/database/tables

我們可以通過配置檔案來配置Quartz使用資料庫儲存:

# job store
quartz.jobStore.type = Quartz.Impl.AdoJobStore.JobStoreTX, Quartz
quartz.jobStore.dataSource = quartz_store
quartz.jobStore.driverDelegateType = Quartz.Impl.AdoJobStore.PostgreSQLDelegate, Quartz
#quartz.jobStore.useProperties = true

quartz.dataSource.quartz_store.connectionString = Server=localhost;Database=quartz_store;userid=quartz_net;password=xxxxxx;Pooling=true;MinPoolSize=1;MaxPoolSize=10;Timeout=15;SslMode=Disable;
quartz.dataSource.quartz_store.provider = Npgsql

負載均衡

負載均衡是實現高可用的一種方式,當任務量變大以後,單臺伺服器很難滿足需要,使用負載均衡則使得系統具備了橫向擴充套件的能力,通過部署多個節點來增加處理Job的能力。

Quartz.Net在使用負載均衡時,需要依賴ADO JobStore,意味著你需要使用資料庫持久化資料。然後我們可以使用以下配置完成負載均衡功能:

quartz.jobStore.clustered = true
quartz.scheduler.instanceId = AUTO
  • clustered:叢集的標識
  • instanceId:當前Scheduler例項的ID,每個示例的ID不能重複,使用AUTO時系統會自動生成ID

當我們在多臺伺服器上執行Scheduler例項時,需要設定伺服器的時鐘時間,確保伺服器時間是相同的。針對windows伺服器,可以設定從網路自動同步時間。

通過Routing訪問Quartz例項

通過Routing訪問Quartz例項的功能,為我們做系統分離提供了很好的途徑。

我們可以通過以下配置實現Quartz的伺服器端遠端訪問:

# export this server to remoting context
quartz.scheduler.exporter.type = Quartz.Simpl.RemotingSchedulerExporter, Quartz
quartz.scheduler.exporter.port = 555
quartz.scheduler.exporter.bindName = QuartzScheduler
quartz.scheduler.exporter.channelType = tcp
quartz.scheduler.exporter.channelName = httpQuartz

然後我們在客戶端系統中配置訪問:

quartz.scheduler.proxy = true
quartz.scheduler.proxy.address = tcp://localhost:555/QuartzScheduler

參考資料

  • Quartz.Net官方文件
  • Github:Quartz.Net原始碼
  • Quartz Cron表示式
  • SampleQuartz原始碼下載