Colly原始碼解析——主體流程
Colly是一個使用golang實現的資料抓取框架,我們可以使用它快速搭建類似網路爬蟲這樣的應用。本文我們將剖析其原始碼,以探析其中奧祕。(轉載請指明出於breaksoftware的csdn部落格)
Collector是Colly的核心結構體,其中包含了使用者對框架行為的定義。一般情況下,我們可以使用NewCollector方法構建一個它的指標
// NewCollector creates a new Collector instance with default configuration func NewCollector(options ...func(*Collector)) *Collector { c := &Collector{} c.Init() for _, f := range options { f(c) } c.parseSettingsFromEnv() return c }
第4行呼叫了Init方法初始化了Collector的一些成員。然後遍歷並呼叫不定長引數,這些引數都是函式型別——func(*Collector)。我們看個例子
c := colly.NewCollector( // Visit only domains: coursera.org, www.coursera.org colly.AllowedDomains("coursera.org", "www.coursera.org"), // Cache responses to prevent multiple download of pages // even if the collector is restarted colly.CacheDir("./coursera_cache"), )
AllowedDomains和CacheDir都返回一個匿名函式,其邏輯就是將Collector物件中對應的成員設定為指定的值
// AllowedDomains sets the domain whitelist used by the Collector.
func AllowedDomains(domains ...string) func(*Collector) {
return func(c *Collector) {
c.AllowedDomains = domains
}
}
Collector中絕大部分成員均有對應的方法,而且它們的名稱(函式名和成員名)也一致。但是其中只有3個方法——ParseHTTPErrorResponse、AllowURLRevisit和IgnoreRobotsTxt比較特殊,因為它們沒有引數。如果被呼叫,則對應的Collector成員會被設定為true
// AllowURLRevisit instructs the Collector to allow multiple downloads of the same URL
func AllowURLRevisit() func(*Collector) {
return func(c *Collector) {
c.AllowURLRevisit = true
}
}
再回到NewCollector函式,其最後一個邏輯是呼叫parseSettingsFromEnv方法。從名稱我們可以看出它是用於解析環境變數的。將它放在最後是可以理解的,因為後面執行的邏輯可以覆蓋前面的邏輯。這樣我們可以讓環境變數對應的設定生效。
func (c *Collector) parseSettingsFromEnv() {
for _, e := range os.Environ() {
if !strings.HasPrefix(e, "COLLY_") {
continue
}
pair := strings.SplitN(e[6:], "=", 2)
if f, ok := envMap[pair[0]]; ok {
f(c, pair[1])
} else {
log.Println("Unknown environment variable:", pair[0])
}
}
}
它從os.Environ()中獲取系統環境變數,然後遍歷它們。對於以COLLY_開頭的變數,找到其在envMap中的對應方法,並呼叫之以覆蓋之前設定的Collector成員變數值。envMap是一個<string,func>的對映,它是包內全域性的。
var envMap = map[string]func(*Collector, string){
"ALLOWED_DOMAINS": func(c *Collector, val string) {
c.AllowedDomains = strings.Split(val, ",")
},
"CACHE_DIR": func(c *Collector, val string) {
c.CacheDir = val
},
……
初始化完Collector,我們就可以讓其傳送請求。目前Colly公開了5個方法,其中3個是和Post相關的:Post、PostRaw和PostMultipart。一個Get請求方法:Visit。以及一個使用者可以高度定製的方法:Request。這些方法底層都呼叫了scrape方法。比如Visit的實現是
func (c *Collector) Visit(URL string) error {
return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
scrape
scrape方法是需要我們展開分析的。因為它是Colly庫中兩個最重要的方法之一。
// scrape method
func (c *Collector) scrape(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, checkRevisit bool) error {
if err := c.requestCheck(u, method, depth, checkRevisit); err != nil {
return err
}
首先requestCheck方法檢測一些和遞迴深度以及URL相關的資訊
func (c *Collector) requestCheck(u, method string, depth int, checkRevisit bool) error {
if u == "" {
return ErrMissingURL
}
if c.MaxDepth > 0 && c.MaxDepth < depth {
return ErrMaxDepth
}
Collector的MaxDepth預設設定為0,即不用比較深度。如果它被設定值,則遞迴深度不可以超過它。
然後檢測URL是否在被禁止的URL過濾器中。如果在,則返回錯誤。
if len(c.DisallowedURLFilters) > 0 {
if isMatchingFilter(c.DisallowedURLFilters, []byte(u)) {
return ErrForbiddenURL
}
}
之後檢測URL是否在准入的URL過濾器中。如果不在,則返回錯誤
if len(c.URLFilters) > 0 {
if !isMatchingFilter(c.URLFilters, []byte(u)) {
return ErrNoURLFiltersMatch
}
}
最後針對GET請求,檢查其是否被請求過。
if checkRevisit && !c.AllowURLRevisit && method == "GET" {
h := fnv.New64a()
h.Write([]byte(u))
uHash := h.Sum64()
visited, err := c.store.IsVisited(uHash)
if err != nil {
return err
}
if visited {
return ErrAlreadyVisited
}
return c.store.Visited(uHash)
}
return nil
}
通過這些檢測後,scrape會對URL組成進行分析補齊
// scrape method
parsedURL, err := url.Parse(u)
if err != nil {
return err
}
if parsedURL.Scheme == "" {
parsedURL.Scheme = "http"
}
然後針對host進行精確匹配(在requestCheck中,是對URL使用正則進行匹配)。先檢測host是否在被禁止的列表中,然後檢測其是否在准入的列表中。
// scrape method
if !c.isDomainAllowed(parsedURL.Host) {
return ErrForbiddenDomain
}
func (c *Collector) isDomainAllowed(domain string) bool {
for _, d2 := range c.DisallowedDomains {
if d2 == domain {
return false
}
}
if c.AllowedDomains == nil || len(c.AllowedDomains) == 0 {
return true
}
for _, d2 := range c.AllowedDomains {
if d2 == domain {
return true
}
}
return false
}
通過上面檢測,還需要檢查是否需要遵從Robots協議
// scrape method
if !c.IgnoreRobotsTxt {
if err = c.checkRobots(parsedURL); err != nil {
return err
}
}
所有檢測通過後,就需要填充請求了
// scrape method
if hdr == nil {
hdr = http.Header{"User-Agent": []string{c.UserAgent}}
}
rc, ok := requestData.(io.ReadCloser)
if !ok && requestData != nil {
rc = ioutil.NopCloser(requestData)
}
req := &http.Request{
Method: method,
URL: parsedURL,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: hdr,
Body: rc,
Host: parsedURL.Host,
}
setRequestBody(req, requestData)
第5~8行,使用型別斷言等方法,將請求的資料(requestData)轉換成io.ReadCloser介面資料。setRequestBody方法則是根據資料(requestData)的原始型別,設定Request結構中的GetBody方法
func setRequestBody(req *http.Request, body io.Reader) {
if body != nil {
switch v := body.(type) {
case *bytes.Buffer:
req.ContentLength = int64(v.Len())
buf := v.Bytes()
req.GetBody = func() (io.ReadCloser, error) {
r := bytes.NewReader(buf)
return ioutil.NopCloser(r), nil
}
case *bytes.Reader:
req.ContentLength = int64(v.Len())
snapshot := *v
req.GetBody = func() (io.ReadCloser, error) {
r := snapshot
return ioutil.NopCloser(&r), nil
}
case *strings.Reader:
req.ContentLength = int64(v.Len())
snapshot := *v
req.GetBody = func() (io.ReadCloser, error) {
r := snapshot
return ioutil.NopCloser(&r), nil
}
}
if req.GetBody != nil && req.ContentLength == 0 {
req.Body = http.NoBody
req.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil }
}
}
}
這種抽象方式,使得不同型別的requestData都可以通過統一的GetBody方法獲取內容。目前Colly中傳送資料有3種複合結構,分別是:map[string]string、requestData []byte和map[string][]byte。對於普通的Post傳送map[string]string資料,Colly會使用createFormReader方法將其轉換成Reader結構指標
func createFormReader(data map[string]string) io.Reader {
form := url.Values{}
for k, v := range data {
form.Add(k, v)
}
return strings.NewReader(form.Encode())
}
如果是一個二進位制切片,則使用bytes.NewReader直接將其轉換為Reader結構指標
如果是map[string][]byte,則是Post資料的Multipart結構,使用createMultipartReader方法將其轉換成Buffer結構指標。
func createMultipartReader(boundary string, data map[string][]byte) io.Reader {
dashBoundary := "--" + boundary
body := []byte{}
buffer := bytes.NewBuffer(body)
buffer.WriteString("Content-type: multipart/form-data; boundary=" + boundary + "\n\n")
for contentType, content := range data {
buffer.WriteString(dashBoundary + "\n")
buffer.WriteString("Content-Disposition: form-data; name=" + contentType + "\n")
buffer.WriteString(fmt.Sprintf("Content-Length: %d \n\n", len(content)))
buffer.Write(content)
buffer.WriteString("\n")
}
buffer.WriteString(dashBoundary + "--\n\n")
return buffer
}
回到scrape方法中,資料準備結束,開始正式獲取資料
// scrape method
u = parsedURL.String()
c.wg.Add(1)
if c.Async {
go c.fetch(u, method, depth, requestData, ctx, hdr, req)
return nil
}
return c.fetch(u, method, depth, requestData, ctx, hdr, req)
}
通過第4行我們可以看到,可以通過Async引數決定是否非同步的獲取資料。
fetch
在解析fetch方法前,我們要先介紹Collector的幾個回撥函式
htmlCallbacks []*htmlCallbackContainer
xmlCallbacks []*xmlCallbackContainer
requestCallbacks []RequestCallback
responseCallbacks []ResponseCallback
errorCallbacks []ErrorCallback
scrapedCallbacks []ScrapedCallback
以requestCallbacks為例,Colly提供了OnRequest方法用於註冊回撥。由於這些回撥函式通過切片儲存,所以可以多次呼叫註冊方法。(即不是覆蓋之前的註冊回撥)
// OnRequest registers a function. Function will be executed on every
// request made by the Collector
func (c *Collector) OnRequest(f RequestCallback) {
c.lock.Lock()
if c.requestCallbacks == nil {
c.requestCallbacks = make([]RequestCallback, 0, 4)
}
c.requestCallbacks = append(c.requestCallbacks, f)
c.lock.Unlock()
}
使用者則可以使用下面方法進行註冊
// Before making a request print "Visiting ..."
c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting", r.URL.String())
})
這些回撥會被在handleOnXXXX型別的函式中被呼叫。呼叫的順序和註冊的順序一致。
func (c *Collector) handleOnResponse(r *Response) {
if c.debugger != nil {
c.debugger.Event(createEvent("response", r.Request.ID, c.ID, map[string]string{
"url": r.Request.URL.String(),
"status": http.StatusText(r.StatusCode),
}))
}
for _, f := range c.responseCallbacks {
f(r)
}
}
每次呼叫fetch方法都會構建一個全新Request結構。
// fetch method
func (c *Collector) fetch(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, req *http.Request) error {
defer c.wg.Done()
if ctx == nil {
ctx = NewContext()
}
request := &Request{
URL: req.URL,
Headers: &req.Header,
Ctx: ctx,
Depth: depth,
Method: method,
Body: requestData,
collector: c,
ID: atomic.AddUint32(&c.requestCount, 1),
}
這兒注意一下3~5行ctx(上下文)的構建邏輯。如果傳入的ctx為nil,則構建一個新的,否則使用老的。這就意味著Request結構體(以及之後出現的Response結構體)中的ctx可以是每次呼叫fetch時全新產生的,也可以是各個Request公用的。我們回溯下ctx的呼叫棧,發現只有func (c *Collector) Request(……)方法使用的不是nil
func (c *Collector) Request(method, URL string, requestData io.Reader, ctx *Context, hdr http.Header) error {
return c.scrape(URL, method, 1, requestData, ctx, hdr, true)
}
這也就意味著,呼叫Visit、Post、PostRaw和PostMultipart方法在每次呼叫fetch時都會產生一個新的上下文。
由於Context存在被多個goroutine共享訪問的可能性,所以其定義了讀寫鎖進行保護
type Context struct {
contextMap map[string]interface{}
lock *sync.RWMutex
}
再回到fetch方法。資料填充完畢後,就提供了一次給使用者干預之後流程的機會
// fetch method
c.handleOnRequest(request)
if request.abort {
return nil
}
之前我們講解過,handleOnRequest呼叫的是使用者通過OnRequest註冊個所有回撥函式。如果使用者在該回調中呼叫了下面方法,則之後的流程都不走了。
// Abort cancels the HTTP request when called in an OnRequest callback
func (r *Request) Abort() {
r.abort = true
}
如果使用者沒用終止執行,則開始傳送請求
// fetch method
if method == "POST" && req.Header.Get("Content-Type") == "" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
}
if req.Header.Get("Accept") == "" {
req.Header.Set("Accept", "*/*")
}
origURL := req.URL
response, err := c.backend.Cache(req, c.MaxBodySize, c.CacheDir)
對於這次請求,不管是否出錯都會觸發使用者定義的Error回撥
// fetch method
if err := c.handleOnError(response, err, request, ctx); err != nil {
return err
}
在handleOnError函式中,回撥函式會接收到err原因,所以使用者自定義的錯誤處理函式需要通過該值來做區分。
for _, f := range c.errorCallbacks {
f(response, err)
}
return err
正常請求後,fetch會使用ctx和修復後的request填充到response中
// fetch method
if req.URL != origURL {
request.URL = req.URL
request.Headers = &req.Header
}
if proxyURL, ok := req.Context().Value(ProxyURLKey).(string); ok {
request.ProxyURL = proxyURL
}
atomic.AddUint32(&c.responseCount, 1)
response.Ctx = ctx
response.Request = request
err = response.fixCharset(c.DetectCharset, request.ResponseCharacterEncoding)
if err != nil {
return err
}
最後在一系列呼叫使用者回撥中結束fetch
// fetch method
c.handleOnResponse(response)
err = c.handleOnHTML(response)
if err != nil {
c.handleOnError(response, err, request, ctx)
}
err = c.handleOnXML(response)
if err != nil {
c.handleOnError(response, err, request, ctx)
}
c.handleOnScraped(response)
return err
}