Colly原始碼解析——結合例子分析底層實現
通過《Colly原始碼解析——主體流程》分析,我們可以知道Colly執行的主要流程。本文將結合http://go-colly.org上的例子分析一些高階設定的底層實現。(轉載請指明出於breaksoftware的csdn部落格)
遞迴深度
以下例子擷取於Basic
c := colly.NewCollector( // Visit only domains: hackerspaces.org, wiki.hackerspaces.org colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"), ) // On every a element which has href attribute call callback c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") // Print link fmt.Printf("Link found: %q -> %s\n", e.Text, link) // Visit link found on page // Only those links are visited which are in AllowedDomains c.Visit(e.Request.AbsoluteURL(link)) })
c是Collector指標,它的Visit方法給scrape傳遞的“深度”值是1。
func (c *Collector) Visit(URL string) error {
return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
由於NewCollector構造的Collector.MaxDepth為0,而在scrape方法內部呼叫的requestCheck中,如果此值為0,則不會去做深度檢測
// requestCheck method if c.MaxDepth > 0 && c.MaxDepth < depth { return ErrMaxDepth }
如果希望通過MaxDepth控制深度,則可以參見Max depth例子
c := colly.NewCollector( // MaxDepth is 1, so only the links on the scraped page // is visited, and no further links are followed colly.MaxDepth(1), ) // On every a element which has href attribute call callback c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") // Print link fmt.Println(link) // Visit link found on page e.Request.Visit(link) })
第4行將深度設定為1,這樣理論上只能訪問第一層的URL。
如果OnHTML中的程式碼和Basic例子一樣,即使用Collector的Visit訪問URL,則由於其depth一直傳1,而導致requestCheck的深度檢測一直不滿足條件,從而會訪問超過1層的URL。
所以第13行,呼叫的是HTMLElement的Visit方法
func (r *Request) Visit(URL string) error {
return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}
相較於Collector的Visit,HTMLElement的Visit方法將Depth增加了1,並且傳遞了請求的上下文(ctx)。由於depth有變化,所以之後的深度檢測會返回錯誤,從而只會訪問1層URL。
規則
Collector的Limit方法用於設定各種規則。這些規則最終在Collector的httpBackend成員中執行。
一個Collector只有一個httpBackend結構體指標,而一個httpBackend結構體可以有一組規則
type httpBackend struct {
LimitRules []*LimitRule
Client *http.Client
lock *sync.RWMutex
}
規則針對Domain來區分,我們可以通過設定不同的匹配規則,讓每組URL執行相應的操作。這些操作包括:
- 訪問並行數
- 訪問間隔延遲
參見Parallel例子。只擷取其中關鍵一段
// Limit the maximum parallelism to 2
// This is necessary if the goroutines are dynamically
// created to control the limit of simultaneous requests.
//
// Parallelism can be controlled also by spawning fixed
// number of go routines.
c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})
Collector的Limit最終會呼叫到httpBackend的Limit,它將規則加入到規則組後初始化該規則。
// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {
waitChanSize := 1
if r.Parallelism > 1 {
waitChanSize = r.Parallelism
}
r.waitChan = make(chan bool, waitChanSize)
hasPattern := false
if r.DomainRegexp != "" {
c, err := regexp.Compile(r.DomainRegexp)
if err != nil {
return err
}
r.compiledRegexp = c
hasPattern = true
}
if r.DomainGlob != "" {
c, err := glob.Compile(r.DomainGlob)
if err != nil {
return err
}
r.compiledGlob = c
hasPattern = true
}
if !hasPattern {
return ErrNoPattern
}
return nil
}
第7行建立了一個可以承載waitChanSize個元素的channel。可以看到,如果我們在規則中沒有設定並行數,也會建立只有1個元素的channel。這個channel會被用於調節並行執行的任務數量。所以這也就意味著,一旦呼叫了Limit方法而沒設定Parallelism值,該Collector中針對符合規則的請求就會變成序列的。
第10和18行分別針對不同規則初始化一個編譯器。因為這個操作比較重,所以在初始化時執行,之後只是簡單使用這些編譯器即可。
當發起請求時,流程最終會走到httpBackend的Do方法
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {
r := h.GetMatchingRule(request.URL.Host)
if r != nil {
r.waitChan <- true
defer func(r *LimitRule) {
randomDelay := time.Duration(0)
if r.RandomDelay != 0 {
randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))
}
time.Sleep(r.Delay + randomDelay)
<-r.waitChan
}(r)
}
第2行通過域名查詢對應的規則,如果找到,則在第4行嘗試往channel中加入元素。這個操作相當於上鎖。如果channel此時是滿的,則該流程會被掛起。否則就執行之後的流程。在Do函式結束,命中規則的會執行上面的匿名函式,它在休眠規則配置的時間後,嘗試從channel中獲取資料。這個操作相當於釋放鎖。
Colly就是通過channel的特性實現了並行控制。
並行
在“規則”一節,我們講到可以通過Parallelism控制並行goroutine的數量。httpBackend的Do方法最終將被Collector的fetch方法呼叫,而該方法可以被非同步執行,即是一個goroutine。這就意味著承載Do邏輯的goroutine執行完畢後就會退出。而一種類似執行緒的技術在Colly也被支援,它更像一個生產者消費者模型。消費者執行緒執行完一個任務後不會退出,而在生產者生產出的物料池中取出未處理的任務加以處理。
以下程式碼擷取於Queue
q, _ := queue.New(
2, // Number of consumer threads
&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
)
……
for i := 0; i < 5; i++ {
// Add URLs to the queue
q.AddURL(fmt.Sprintf("%s?n=%d", url, i))
}
// Consume URLs
q.Run(c)
這次沒有呼叫Collector的Visit等函式,而是呼叫了Queue的Run。
第2行建立了一個具有2個消費者(goroutine)的Queue。第10行預先給這個Queue加入5個需要訪問的URL。
// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {
u, err := url.Parse(URL)
if err != nil {
return err
}
r := &colly.Request{
URL: u,
Method: "GET",
}
d, err := r.Marshal()
if err != nil {
return err
}
return q.storage.AddRequest(d)
}
AddUrl的第11行將請求序列化,在第15行將該序列化資料儲存到“倉庫”中。
在Run方法中,Colly將啟動2個goroutine。注意它是使用for迴圈組織的,這意味著如果for內無break,它會一直迴圈執行下去——不退出。
func (q *Queue) Run(c *colly.Collector) error {
wg := &sync.WaitGroup{}
for i := 0; i < q.Threads; i++ {
wg.Add(1)
go func(c *colly.Collector, wg *sync.WaitGroup) {
defer wg.Done()
for {
如果佇列中沒有需要處理的request,則會嘗試退出
if q.IsEmpty() {
if q.activeThreadCount == 0 {
break
}
ch := make(chan bool)
q.lock.Lock()
q.threadChans = append(q.threadChans, ch)
q.lock.Unlock()
action := <-ch
if action == stop && q.IsEmpty() {
break
}
}
activeThreadCount表示當前執行中的消費者goroutine數量。如果已經沒有消費者了,則直接跳出for迴圈,整個goroutine結束。
如果還有消費者,則建立一個channel,並將其加入到q.threadChans的channel切片中。然後在第9行等待該channel被寫入值。如果寫入的是true並且此時沒有需要處理的request,則退出goroutine。可以看到這段邏輯檢測了兩次是否有request,這個我們之後再討論。
如果還有request要處理,則遞增消費者數量(在finish中會遞減以抵消)。然後從“倉庫”中取出一個任務,在通過Request的Do方法發起請求,最後呼叫finish方法善後。
q.lock.Lock()
atomic.AddInt32(&q.activeThreadCount, 1)
q.lock.Unlock()
rb, err := q.storage.GetRequest()
if err != nil || rb == nil {
q.finish()
continue
}
r, err := c.UnmarshalRequest(rb)
if err != nil || r == nil {
q.finish()
continue
}
r.Do()
q.finish()
}
}(c, wg)
}
wg.Wait()
return nil
}
finish方法幹了三件事:
- 遞減消費者數量,以抵消Run方法中的遞增。
- 將Queue的各個等待中的,其他goroutine建立的channel傳入true值,即告知他們可以退出了。
- 給Queue建立一個空的channel切片
func (q *Queue) finish() {
q.lock.Lock()
q.activeThreadCount--
for _, c := range q.threadChans {
c <- stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
}
我們再看下怎麼在請求的過程中給Queue增加任務
// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
d, err := r.Marshal()
if err != nil {
return err
}
if err := q.storage.AddRequest(d); err != nil {
return err
}
q.lock.Lock()
for _, c := range q.threadChans {
c <- !stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
return nil
}
第3~9行,會將請求序列化後儲存到“倉庫”中。
第10~15行,會將其他goroutine建立的channel傳入false,告知它們不要退出。然後再建立一個空的channel切片。
finish和AddRequest都使用鎖鎖住了所有的邏輯,而且它們都會把其他goroutine建立的channel傳入值,然後將Queue的channel切片清空。這樣就保證這些channel只可能收到一種狀態。由於它自己建立的channel是在finish呼叫完之後才有機會創建出來,所以不會造成死鎖。
再回來看goroutine退出的邏輯
if q.IsEmpty() {
if q.activeThreadCount == 0 {
break
}
ch := make(chan bool)
q.lock.Lock()
q.threadChans = append(q.threadChans, ch)
q.lock.Unlock()
action := <-ch
if action == stop && q.IsEmpty() {
break
}
}
如果finish方法中遞減的activeThreadCount為0,這說明這是最後一個goroutine了,而且當前也沒request,所以退出。當然此時存在一種可能:在1行執行結束後,其他非消費者goroutine呼叫AddRequest新增了若干request。而執行第2行時,goroutine將退出,從而導致存在request沒有處理的可能。
如果還存在其他goroutine,則本goroutine將在第5行建立一個channel,並將這個channel加入到Queue的channel切片中。供其他goroutine呼叫finish往channel中傳入true,或者AddRequest傳入false,調控是否需要退出本過程。在第9行等待channel傳出資料前,可能存在如下幾種情況:
- 執行了finish
- 執行了AddRequest
- 執行了finish後執行了AddRequest
- 執行了AddRequest後執行了finish
如果是第1和4種,action將是false。第2和3種,action是true。但是這個情況下不能單純的通過action決定是否退出。因為第9和10行執行需要時間,這段時間其他goroutine可能還會執行AddRequest新增任務,或者GetRequest刪除任務。所以還要在第10行檢測下IsEmpty。
這段是我閱讀Colly中思考的最多的程式碼,因為有goroutine和channel,導致整個邏輯比較複雜。也感慨下,雖然goroutine很方便,但是真的能把它寫對也是不容易的。
分散式
在Queue例子中,我們看到“倉庫”這個概念。回顧下Queue的例子,“倉庫”是InMemoryQueueStorage。顧名思義,它是一個記憶體型的倉庫,所以不存在分散式基礎。
// create a request queue with 2 consumer threads
q, _ := queue.New(
2, // Number of consumer threads
&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
)
一個分散式的例子是Redis backend,擷取一段
// create the redis storage
storage := &redisstorage.Storage{
Address: "127.0.0.1:6379",
Password: "",
DB: 0,
Prefix: "httpbin_test",
}
// add storage to the collector
err := c.SetStorage(storage)
if err != nil {
panic(err)
}
// delete previous data from storage
if err := storage.Clear(); err != nil {
log.Fatal(err)
}
// close redis client
defer storage.Client.Close()
// create a new request queue with redis storage backend
q, _ := queue.New(2, storage)
這兒建立了一個redis型的倉庫。不僅Collector的Storage是它,Queue的Storage也是它。這樣一個叢集上的服務都往這個倉庫裡存入和取出資料,從而實現分散式架構。
redisstorage庫引自github.com/gocolly/redisstorage。我們檢視其原始碼,其實現了Collector的storage需要的介面
type Storage interface {
// Init initializes the storage
Init() error
// Visited receives and stores a request ID that is visited by the Collector
Visited(requestID uint64) error
// IsVisited returns true if the request was visited before IsVisited
// is called
IsVisited(requestID uint64) (bool, error)
// Cookies retrieves stored cookies for a given host
Cookies(u *url.URL) string
// SetCookies stores cookies for a given host
SetCookies(u *url.URL, cookies string)
}
以及Queue的storage需要的
// Storage is the interface of the queue's storage backend
type Storage interface {
// Init initializes the storage
Init() error
// AddRequest adds a serialized request to the queue
AddRequest([]byte) error
// GetRequest pops the next request from the queue
// or returns error if the queue is empty
GetRequest() ([]byte, error)
// QueueSize returns with the size of the queue
QueueSize() (int, error)
}