1. 程式人生 > >Colly原始碼解析——結合例子分析底層實現

Colly原始碼解析——結合例子分析底層實現

        通過《Colly原始碼解析——主體流程》分析,我們可以知道Colly執行的主要流程。本文將結合http://go-colly.org上的例子分析一些高階設定的底層實現。(轉載請指明出於breaksoftware的csdn部落格)

遞迴深度

        以下例子擷取於Basic

	c := colly.NewCollector(
		// Visit only domains: hackerspaces.org, wiki.hackerspaces.org
		colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Printf("Link found: %q -> %s\n", e.Text, link)
		// Visit link found on page
		// Only those links are visited which are in AllowedDomains
		c.Visit(e.Request.AbsoluteURL(link))
	})

        c是Collector指標,它的Visit方法給scrape傳遞的“深度”值是1。

func (c *Collector) Visit(URL string) error {
	return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}

        由於NewCollector構造的Collector.MaxDepth為0,而在scrape方法內部呼叫的requestCheck中,如果此值為0,則不會去做深度檢測

// requestCheck method
	if c.MaxDepth > 0 && c.MaxDepth < depth {
		return ErrMaxDepth
	}

        如果希望通過MaxDepth控制深度,則可以參見Max depth例子

	c := colly.NewCollector(
		// MaxDepth is 1, so only the links on the scraped page
		// is visited, and no further links are followed
		colly.MaxDepth(1),
	)

	// On every a element which has href attribute call callback
	c.OnHTML("a[href]", func(e *colly.HTMLElement) {
		link := e.Attr("href")
		// Print link
		fmt.Println(link)
		// Visit link found on page
		e.Request.Visit(link)
	})

        第4行將深度設定為1,這樣理論上只能訪問第一層的URL。

        如果OnHTML中的程式碼和Basic例子一樣,即使用Collector的Visit訪問URL,則由於其depth一直傳1,而導致requestCheck的深度檢測一直不滿足條件,從而會訪問超過1層的URL。

        所以第13行,呼叫的是HTMLElement的Visit方法

func (r *Request) Visit(URL string) error {
	return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}

        相較於Collector的Visit,HTMLElement的Visit方法將Depth增加了1,並且傳遞了請求的上下文(ctx)。由於depth有變化,所以之後的深度檢測會返回錯誤,從而只會訪問1層URL。

規則

        Collector的Limit方法用於設定各種規則。這些規則最終在Collector的httpBackend成員中執行。

        一個Collector只有一個httpBackend結構體指標,而一個httpBackend結構體可以有一組規則

type httpBackend struct {
	LimitRules []*LimitRule
	Client     *http.Client
	lock       *sync.RWMutex
}

        規則針對Domain來區分,我們可以通過設定不同的匹配規則,讓每組URL執行相應的操作。這些操作包括:

  • 訪問並行數
  • 訪問間隔延遲

        參見Parallel例子。只擷取其中關鍵一段

	// Limit the maximum parallelism to 2
	// This is necessary if the goroutines are dynamically
	// created to control the limit of simultaneous requests.
	//
	// Parallelism can be controlled also by spawning fixed
	// number of go routines.
	c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})

        Collector的Limit最終會呼叫到httpBackend的Limit,它將規則加入到規則組後初始化該規則。

// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {
	waitChanSize := 1
	if r.Parallelism > 1 {
		waitChanSize = r.Parallelism
	}
	r.waitChan = make(chan bool, waitChanSize)
	hasPattern := false
	if r.DomainRegexp != "" {
		c, err := regexp.Compile(r.DomainRegexp)
		if err != nil {
			return err
		}
		r.compiledRegexp = c
		hasPattern = true
	}
	if r.DomainGlob != "" {
		c, err := glob.Compile(r.DomainGlob)
		if err != nil {
			return err
		}
		r.compiledGlob = c
		hasPattern = true
	}
	if !hasPattern {
		return ErrNoPattern
	}
	return nil
}

        第7行建立了一個可以承載waitChanSize個元素的channel。可以看到,如果我們在規則中沒有設定並行數,也會建立只有1個元素的channel。這個channel會被用於調節並行執行的任務數量。所以這也就意味著,一旦呼叫了Limit方法而沒設定Parallelism值,該Collector中針對符合規則的請求就會變成序列的。

        第10和18行分別針對不同規則初始化一個編譯器。因為這個操作比較重,所以在初始化時執行,之後只是簡單使用這些編譯器即可。

        當發起請求時,流程最終會走到httpBackend的Do方法

func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {
	r := h.GetMatchingRule(request.URL.Host)
	if r != nil {
		r.waitChan <- true
		defer func(r *LimitRule) {
			randomDelay := time.Duration(0)
			if r.RandomDelay != 0 {
				randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))
			}
			time.Sleep(r.Delay + randomDelay)
			<-r.waitChan
		}(r)
	}

        第2行通過域名查詢對應的規則,如果找到,則在第4行嘗試往channel中加入元素。這個操作相當於上鎖。如果channel此時是滿的,則該流程會被掛起。否則就執行之後的流程。在Do函式結束,命中規則的會執行上面的匿名函式,它在休眠規則配置的時間後,嘗試從channel中獲取資料。這個操作相當於釋放鎖。

        Colly就是通過channel的特性實現了並行控制。

並行

        在“規則”一節,我們講到可以通過Parallelism控制並行goroutine的數量。httpBackend的Do方法最終將被Collector的fetch方法呼叫,而該方法可以被非同步執行,即是一個goroutine。這就意味著承載Do邏輯的goroutine執行完畢後就會退出。而一種類似執行緒的技術在Colly也被支援,它更像一個生產者消費者模型。消費者執行緒執行完一個任務後不會退出,而在生產者生產出的物料池中取出未處理的任務加以處理。

        以下程式碼擷取於Queue

	q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

	……

	for i := 0; i < 5; i++ {
		// Add URLs to the queue
		q.AddURL(fmt.Sprintf("%s?n=%d", url, i))
	}
	// Consume URLs
	q.Run(c)

        這次沒有呼叫Collector的Visit等函式,而是呼叫了Queue的Run。

        第2行建立了一個具有2個消費者(goroutine)的Queue。第10行預先給這個Queue加入5個需要訪問的URL。

// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {
	u, err := url.Parse(URL)
	if err != nil {
		return err
	}
	r := &colly.Request{
		URL:    u,
		Method: "GET",
	}
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	return q.storage.AddRequest(d)
}

        AddUrl的第11行將請求序列化,在第15行將該序列化資料儲存到“倉庫”中。

        在Run方法中,Colly將啟動2個goroutine。注意它是使用for迴圈組織的,這意味著如果for內無break,它會一直迴圈執行下去——不退出。

func (q *Queue) Run(c *colly.Collector) error {
	wg := &sync.WaitGroup{}
	for i := 0; i < q.Threads; i++ {
		wg.Add(1)
		go func(c *colly.Collector, wg *sync.WaitGroup) {
			defer wg.Done()
			for {

        如果佇列中沒有需要處理的request,則會嘗試退出

				if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

        activeThreadCount表示當前執行中的消費者goroutine數量。如果已經沒有消費者了,則直接跳出for迴圈,整個goroutine結束。

        如果還有消費者,則建立一個channel,並將其加入到q.threadChans的channel切片中。然後在第9行等待該channel被寫入值。如果寫入的是true並且此時沒有需要處理的request,則退出goroutine。可以看到這段邏輯檢測了兩次是否有request,這個我們之後再討論。

        如果還有request要處理,則遞增消費者數量(在finish中會遞減以抵消)。然後從“倉庫”中取出一個任務,在通過Request的Do方法發起請求,最後呼叫finish方法善後。

				q.lock.Lock()
				atomic.AddInt32(&q.activeThreadCount, 1)
				q.lock.Unlock()
				rb, err := q.storage.GetRequest()
				if err != nil || rb == nil {
					q.finish()
					continue
				}
				r, err := c.UnmarshalRequest(rb)
				if err != nil || r == nil {
					q.finish()
					continue
				}
				r.Do()
				q.finish()
			}
		}(c, wg)
	}
	wg.Wait()
	return nil
}

        finish方法幹了三件事:

  1. 遞減消費者數量,以抵消Run方法中的遞增。
  2. 將Queue的各個等待中的,其他goroutine建立的channel傳入true值,即告知他們可以退出了。
  3. 給Queue建立一個空的channel切片
func (q *Queue) finish() {
	q.lock.Lock()
	q.activeThreadCount--
	for _, c := range q.threadChans {
		c <- stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
}

        我們再看下怎麼在請求的過程中給Queue增加任務

// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
	d, err := r.Marshal()
	if err != nil {
		return err
	}
	if err := q.storage.AddRequest(d); err != nil {
		return err
	}
	q.lock.Lock()
	for _, c := range q.threadChans {
		c <- !stop
	}
	q.threadChans = make([]chan bool, 0, q.Threads)
	q.lock.Unlock()
	return nil
}

        第3~9行,會將請求序列化後儲存到“倉庫”中。

        第10~15行,會將其他goroutine建立的channel傳入false,告知它們不要退出。然後再建立一個空的channel切片。

        finish和AddRequest都使用鎖鎖住了所有的邏輯,而且它們都會把其他goroutine建立的channel傳入值,然後將Queue的channel切片清空。這樣就保證這些channel只可能收到一種狀態。由於它自己建立的channel是在finish呼叫完之後才有機會創建出來,所以不會造成死鎖。

        再回來看goroutine退出的邏輯

				if q.IsEmpty() {
					if q.activeThreadCount == 0 {
						break
					}
					ch := make(chan bool)
					q.lock.Lock()
					q.threadChans = append(q.threadChans, ch)
					q.lock.Unlock()
					action := <-ch
					if action == stop && q.IsEmpty() {
						break
					}
				}

        如果finish方法中遞減的activeThreadCount為0,這說明這是最後一個goroutine了,而且當前也沒request,所以退出。當然此時存在一種可能:在1行執行結束後,其他非消費者goroutine呼叫AddRequest新增了若干request。而執行第2行時,goroutine將退出,從而導致存在request沒有處理的可能。

        如果還存在其他goroutine,則本goroutine將在第5行建立一個channel,並將這個channel加入到Queue的channel切片中。供其他goroutine呼叫finish往channel中傳入true,或者AddRequest傳入false,調控是否需要退出本過程。在第9行等待channel傳出資料前,可能存在如下幾種情況:

  1. 執行了finish
  2. 執行了AddRequest
  3. 執行了finish後執行了AddRequest
  4. 執行了AddRequest後執行了finish

        如果是第1和4種,action將是false。第2和3種,action是true。但是這個情況下不能單純的通過action決定是否退出。因為第9和10行執行需要時間,這段時間其他goroutine可能還會執行AddRequest新增任務,或者GetRequest刪除任務。所以還要在第10行檢測下IsEmpty。

        這段是我閱讀Colly中思考的最多的程式碼,因為有goroutine和channel,導致整個邏輯比較複雜。也感慨下,雖然goroutine很方便,但是真的能把它寫對也是不容易的。

分散式

        在Queue例子中,我們看到“倉庫”這個概念。回顧下Queue的例子,“倉庫”是InMemoryQueueStorage。顧名思義,它是一個記憶體型的倉庫,所以不存在分散式基礎。

	// create a request queue with 2 consumer threads
	q, _ := queue.New(
		2, // Number of consumer threads
		&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
	)

        一個分散式的例子是Redis backend,擷取一段

	// create the redis storage
	storage := &redisstorage.Storage{
		Address:  "127.0.0.1:6379",
		Password: "",
		DB:       0,
		Prefix:   "httpbin_test",
	}

	// add storage to the collector
	err := c.SetStorage(storage)
	if err != nil {
		panic(err)
	}

	// delete previous data from storage
	if err := storage.Clear(); err != nil {
		log.Fatal(err)
	}

	// close redis client
	defer storage.Client.Close()

	// create a new request queue with redis storage backend
	q, _ := queue.New(2, storage)

        這兒建立了一個redis型的倉庫。不僅Collector的Storage是它,Queue的Storage也是它。這樣一個叢集上的服務都往這個倉庫裡存入和取出資料,從而實現分散式架構。

        redisstorage庫引自github.com/gocolly/redisstorage。我們檢視其原始碼,其實現了Collector的storage需要的介面

type Storage interface {
	// Init initializes the storage
	Init() error
	// Visited receives and stores a request ID that is visited by the Collector
	Visited(requestID uint64) error
	// IsVisited returns true if the request was visited before IsVisited
	// is called
	IsVisited(requestID uint64) (bool, error)
	// Cookies retrieves stored cookies for a given host
	Cookies(u *url.URL) string
	// SetCookies stores cookies for a given host
	SetCookies(u *url.URL, cookies string)
}

        以及Queue的storage需要的

// Storage is the interface of the queue's storage backend
type Storage interface {
	// Init initializes the storage
	Init() error
	// AddRequest adds a serialized request to the queue
	AddRequest([]byte) error
	// GetRequest pops the next request from the queue
	// or returns error if the queue is empty
	GetRequest() ([]byte, error)
	// QueueSize returns with the size of the queue
	QueueSize() (int, error)
}