1. 程式人生 > >Go元件學習——cron定時器

Go元件學習——cron定時器

1 前言

  轉到Go已經將近三個月,寫業務程式碼又找到了屬於Go的條件反射了。

  後置宣告和多引數返回這些Go風格程式碼寫起來也不會那麼蹩腳,甚至還有點小適應~

  反而,前幾天在寫Java的時候,發現Java怎麼啟動這麼慢,Java怎麼能夠容忍這些用不到的程式碼還理直氣壯的躺在那……等等,這些話在哪聽過類似的???

  “Go為什麼要後置宣告,多彆扭啊”

  “Go裡面為啥要定義這麼多的struct,看的頭暈”

  ……

  其實,沒有最好的語言,只有最適合的。

 

  前面《Go語言學習》系列主要介紹了一些Go的基礎知識和相較於Java的一些新特性。後續如果有相關的體會和新的還會繼續更新。

  從這篇開始,開始學習Go的一些工具類庫和開源元件,希望在學習這些優秀的開源專案過程中,更深入的瞭解Go,發現Go的威力。

 

2 cron簡介

  robfig/cron是一個第三方開源的任務排程庫,也就是我們平時說的定時任務。

  Github:https://github.com/robfig/cron

  官方文件:https://godoc.org/github.com/robfig/cron

 

3 cron如何使用

1、新建檔案cron-demo.go

package main

import (
	"fmt"
	"github.com/robfig/cron"
	"time"
)

func main() {
	c := cron.New()
	c.AddFunc("*/3 * * * * *", func() {
		fmt.Println("every 3 seconds executing")
	})

	go c.Start()
	defer c.Stop()


	select {
	case <-time.After(time.Second * 10):
		return
	}
}
  • cron.New建立一個定時器管理器

  • c.AddFunc新增一個定時任務,第一個引數是cron時間表達式,第二個引數是要觸發執行的函式

  • go c.Start()新啟一個協程,執行定時任務

  • c.Stop是等待停止訊號結束任務

 

2、在cron-demo.go檔案下執行go build

本專案採用go mod進行包管理,所以執行go build命令後,會在go.mod檔案中生成對應的依賴版本如圖所示

 

3、執行cron-demo.go

  可以看出每3秒執行一次,直到10秒後過期退出程序,任務結束。

  程式碼參見專案:go-demo專案(https://github.com/DMinerJackie/go-demo/tree/master/main/src/cron)

 

看上去這個任務排程還是蠻好用的,那麼具體是如何實現的呢,看了下原始碼,也是非常的短小精悍,目錄結構如下。

下面通過幾個問題一起看下cron是如何實現任務排程。

 

4 cron如何解析任務表示式

  上例我們看到新增“*/3 * * * * *”這樣的表示式,就能實現每3秒執行一次。

  顯然,這個表示式只是對人友好的一種約定表達形式,要真正在指定時間執行任務,cron肯定是要讀取並解析這個c表示式,轉化為具體的時間再執行。

  那我們來看看,這個具體是如何執行的。

  進入AddFunc函式實現

// AddFunc adds a func to the Cron to be run on the given schedule.
func (c *Cron) AddFunc(spec string, cmd func()) error {
	return c.AddJob(spec, FuncJob(cmd))
}

  

  這只是套了個殼,具體還要進入AddJob函式

// AddJob adds a Job to the Cron to be run on the given schedule.
func (c *Cron) AddJob(spec string, cmd Job) error {
	schedule, err := Parse(spec)
	if err != nil {
		return err
	}
	c.Schedule(schedule, cmd)
	return nil
}

  

  該函式第一行就是解析cron表示式,順藤摸瓜,我們看到具體實現如下

// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {
	if len(spec) == 0 {
		return nil, fmt.Errorf("Empty spec string")
	}
	if spec[0] == '@' && p.options&Descriptor > 0 {
		return parseDescriptor(spec)
	}

	// Figure out how many fields we need
	max := 0
	for _, place := range places {
		if p.options&place > 0 {
			max++
		}
	}
	min := max - p.optionals

	// Split fields on whitespace
	fields := strings.Fields(spec)	// 使用空白符拆分cron表示式

	// Validate number of fields
	if count := len(fields); count < min || count > max {
		if min == max {
			return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec)
		}
		return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec)
	}

	// Fill in missing fields
	fields = expandFields(fields, p.options)

	var err error
	field := func(field string, r bounds) uint64 {	// 抽象出filed函式,方便下面呼叫
		if err != nil {
			return 0
		}
		var bits uint64
		bits, err = getField(field, r)
		return bits
	}

	var (
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		hour       = field(fields[2], hours)
		dayofmonth = field(fields[3], dom)
		month      = field(fields[4], months)
		dayofweek  = field(fields[5], dow)
	)
	if err != nil {
		return nil, err
	}

	return &SpecSchedule{
		Second: second,
		Minute: minute,
		Hour:   hour,
		Dom:    dayofmonth,
		Month:  month,
		Dow:    dayofweek,
	}, nil
}

  

  該函式主要是將cron表示式對映為“Second, Minute, Hour, Dom, Month, Dow”6個時間維度的結構體SpecSchedule。

  SpecSchedule是實現了方法“Next(time.Time) time.Time”的結構體,而“Next(time.Time) time.Time”是定義在Schedule介面中的

// The Schedule describes a job's duty cycle.
type Schedule interface {
	// Return the next activation time, later than the given time.
	// Next is invoked initially, and then each time the job is run.
	Next(time.Time) time.Time
}

  所以,最終可以理解是將cron解析後轉換為下一次要執行的時刻,等待執行。

 

5 cron如何執行任務

  我們知道通過parser.go可以將人很好理解的表示式轉換為cron可以讀懂的要執行的時間。

  有了要執行的時間點,那麼cron具體是如何執行這些任務的呢?

  我們看下Start函式的具體實現

// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

  這裡會通過判定Cron的running欄位是否在執行來鉅額聽是否要啟動任務。

  顯然這裡running是false,因為在呼叫c.New初始化的時候running被設定為false。

  所以,這裡新啟一個協程用於執行定時任務,再次順藤摸瓜,我們看到run函式的實現

// Run the scheduler. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
	}

	for {
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries))

		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {	// 如果沒有要執行的任務或者第一個任務的待執行時間為空,則睡眠
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))	// 否則新建一個距離現在到下一個要觸發執行的Timer
		}

		for {
			select {
			case now = <-timer.C:	// 觸發時間到,執行任務
				now = now.In(c.location)
				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					go c.runWithRecovery(e.Job)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
				}

			case newEntry := <-c.add:	// 新增任務
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)

			case <-c.snapshot:	// 呼叫c.Entries()返回一個現有任務列表的snapshot
				c.snapshot <- c.entrySnapshot()
				continue

			case <-c.stop:	// 任務結束,退出
				timer.Stop()
				return
			}

			break
		}
	}
}
  • 進入該函式,首先遍歷所以任務,找到所有任務下一個要執行的時間。

  • 然後進入外層for迴圈,對於各個任務按照執行時間進行排序,保證離當前時間最近的先執行。

  • 再對任務列表進行判定,是否有任務如果沒有,則休眠,否則初始化一個timer。

 

裡層的for迴圈才是重頭戲,下面主要分析這個for迴圈裡面的任務加入和執行。

在此之前,需要了解下go標準庫的timer

timer用於指定在某個時間間隔後,呼叫函式或者表示式。

使用NewTimer就可以建立一個Timer,在指定時間間隔到達後,可以通過<-timer.C接收值。

package main

import (
	"fmt"
	"time"
)

func main() {
	timer1 := time.NewTimer(2 * time.Second)

	<-timer1.C
	fmt.Println("Timer 1 expired")

	timer2 := time.NewTimer(time.Second)
	go func() {
		<-timer2.C
		fmt.Println("Timer 2 expired")
	}()

	stop2 := timer2.Stop()
	if stop2 {
		fmt.Println("Timer 2 stopped")
	}
}

  執行結果為

Timer 1 expired
Timer 2 stopped

  timer1表示2秒後到期,在此之前都是阻塞狀態,2秒後<-timer1.C接收到訊號,執行下面的列印語句。

  timer2表示1秒後到期,但是中途被Stop掉了,相當於清除了定時功能。

 

  有了這個背景之後,我們再來看run函式的裡層for迴圈。

  接收到c.add通道

case newEntry := <-c.add:	// 新增任務
	timer.Stop()
	now = c.now()
	newEntry.Next = newEntry.Schedule.Next(now)
	c.entries = append(c.entries, newEntry)

  將timer停掉,清除設定的定時功能,並以當前時間點為起點,設定新增任務的下一次執行時間,並新增到entries任務佇列中。

 

  接收到timer.C通道

 

case now = <-timer.C:	// 觸發時間到,執行任務
	now = now.In(c.location)
    // Run every entry whose next time was less than now
    for _, e := range c.entries {
    	if e.Next.After(now) || e.Next.IsZero() {
    		break
    	}
    go c.runWithRecovery(e.Job)
    e.Prev = e.Next
    e.Next = e.Schedule.Next(now)
    }

 

  當定任務到點後,time.C就會接收到值,並新開協程執行真正需要執行的Job,之後再更新下一個要執行的任務列表。

  我們進入runWithRecovery函式,該函式從函式名就可以看出,即使出現panic也可以重新recovery,保證其他任務不受影響。

func (c *Cron) runWithRecovery(j Job) {
	defer func() {
		if r := recover(); r != nil {
			const size = 64 << 10
			buf := make([]byte, size)
			buf = buf[:runtime.Stack(buf, false)]
			c.logf("cron: panic running job: %v\n%s", r, buf)
		}
	}()
	j.Run()
}

追根溯源,我們發現真正執行Job的是j.Run()的執行。進入這個Run函式的實現,我們看到

func (f FuncJob) Run() { f() }

沒錯,我們要執行的任務一直從AddFunc一直往下傳遞,直到這裡,我們通過呼叫Run函式,將包裝的FuncJob型別的函式通過f()的形式進行執行。

這裡說的可能比較模糊,舉個例子,Go裡面的閉包定義

func () {
    fmt.Println("test")
}()

如果這裡定義後面沒有"()"該函式就不會執行,所以結合這個看上面的定時任務是如何執行就更容易理解了。

 

6 程式碼閱讀體會

1、channel的奧妙

  通過channel可以讓感知變得輕而易舉,比如timer.C就像是時間到了,自然會有人來敲門告訴你。而不需要我們自己主動去獲取是否到期了。

2、常用類庫的使用

  比如在parser裡面我們看到了"fields := strings.Fields(spec)",在日常開發中,我們可以靈活使用這些API,避免自己造輪子的情況。

3、多思考

  之前做Java的時候,更多的是沉浸在各種工具和框架的使用,對於這些工具和框架的實現關注的不多。比如從Quartz到Spring Job,我們需要更新的是越來越好用的定時任務工具,而底層的實現升級Spring都幫我們考慮好了。這種對業務對專案有友好的,可以快速的實現業務功能開發,但是對於開發者並不友好,友好的設計麻痺了開發者對於底層原理的深究的慾望。