1. 程式人生 > 實用技巧 >使用Python抓取google搜尋結果

使用Python抓取google搜尋結果

1. 搜尋引擎的選取

選擇一個好的搜尋引擎意味著你能夠得到更準確的搜尋結果。我用過的搜尋引擎有四種:Google、Bing、Baidu、Yahoo!。 作為程式設計師,我首選Google。但當我看見我最愛的Google返回給我的全是一堆的js程式碼,根本沒我想要的搜尋結果。於是我轉而投向了Bing的陣營,在用過一段時間後我發現Bing返回的搜尋結果對於我的問題來說不太理想。正當我要絕望時,Google拯救了我。原來Google為了照顧那些禁止瀏覽器使用js的使用者,還有另外一種搜尋方式,請看下面的搜尋URL:

https://www.google.com.hk/search?hl=en&q=hello

hl指定要搜尋的語言,q就是你要搜尋的關鍵字。 好了,感謝Google,搜尋結果頁面包含我要抓取的內容。

PS: 網上很多利用python抓取Google搜尋結果還是利用https://ajax.googleapis.com/ajax/services/search/web... 的方法。需要注意的是這個方法Google已經不再推薦使用了,見https://developers.google.com/web-search/docs/。Google現在提供了Custom Search API, 不過API限制每天100次請求,如果需要更多則只能花錢買。

2. Python抓取並分析網頁

利用Python抓取網頁很方便,不多說,見程式碼:

def search(self, queryStr):
     queryStr = urllib2.quote(queryStr)
     url = 'https://www.google.com.hk/search?hl=en&q=%s' % queryStr
     request = urllib2.Request(url)
     response = urllib2.urlopen(request)
     html = response.read()
     results = self.extractSearchResults(html)

第6行的 html 就是我們抓取的搜尋結果頁面原始碼。使用過Python的同學會發現,Python同時提供了urllib 和 urllib2兩個模組,都是和URL請求相關的模組,不過提供了不同的功能,urllib只可以接收URL,而urllib2可以接受一個Request類的例項來設定URL請求的headers,這意味著你可以偽裝你的user agent 等(下面會用到)。

現在我們已經可以用Python抓取網頁並儲存下來,接下來我們就可以從原始碼頁面中抽取我們想要的搜尋結果。Python提供了htmlparser模組,不過用起來相對比較麻煩,這裡推薦一個很好用的網頁分析包BeautifulSoup,關於BeautifulSoup的用法官網有詳細的介紹,這裡我不再多說。

利用上面的程式碼,對於少量的查詢還比較OK,但如果要進行上千上萬次的查詢,上面的方法就不再有效了, Google會檢測你請求的來源,如果我們利用機器頻繁爬取Google的搜尋結果,不多久就Google會block你的IP,並給你返回503 Error頁面。這不是我們想要的結果,於是我們還要繼續探索

前面提到利用urllib2我們可以設定URL請求的headers, 偽裝我們的user agent。簡單的說,user agent就是客戶端瀏覽器等應用程式使用的一種特殊的網路協議, 在每次瀏覽器(郵件客戶端/搜尋引擎蜘蛛)進行 HTTP 請求時傳送到伺服器,伺服器就知道了使用者是使用什麼瀏覽器(郵件客戶端/搜尋引擎蜘蛛)來訪問的。 有時候為了達到一些目的,我們不得不去善意的欺騙伺服器告訴它我不是在用機器訪問你。

於是,我們的程式碼就成了下面這個樣子:

user_agents = ['Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20130406 Firefox/23.0', \
         'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/20100101 Firefox/18.0', \
         'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/533+ \
         (KHTML, like Gecko) Element Browser 5.0', \
         'IBM WebExplorer /v0.94', 'Galaxy/1.0 [en] (Mac OS X 10.5.6; U; en)', \
         'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)', \
         'Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14', \
         'Mozilla/5.0 (iPad; CPU OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) \
         Version/6.0 Mobile/10A5355d Safari/8536.25', \
         'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) \
         Chrome/28.0.1468.0 Safari/537.36', \
         'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0; TheWorld)']
 def search(self, queryStr):
     queryStr = urllib2.quote(queryStr)
     url = 'https://www.google.com.hk/search?hl=en&q=%s' % queryStr
     request = urllib2.Request(url)
     index = random.randint(0, 9)
     user_agent = user_agents[index]
     request.add_header('User-agent', user_agent)
     response = urllib2.urlopen(request)
     html = response.read()
     results = self.extractSearchResults(html)

不要被user_agents那個list嚇到,那其實就是10個user agent 字串,這麼做是讓我們偽裝的更好一些,如果你需要更多的user agent 請看這裡 UserAgentString。

17-19行表示隨機選擇一個user agent 字串,然後用request 的add_header方法偽裝一個user agent。

通過偽裝user agent能夠讓我們持續抓取搜尋引擎結果,如果這樣還不行,那我建議在每兩次查詢間隨機休眠一段時間,這樣會影響抓取速度,但是能夠讓你更持續的抓取結果,如果你有多個IP,那抓取的速度也就上來了。

import sys
import os
import urllib2
import socket
import time
import gzip
import StringIO
import re
import random
import types
from dotenv import load_dotenv, find_dotenv
from bs4 import BeautifulSoup
 
reload(sys)
sys.setdefaultencoding('utf-8')
 
# Load config from .env file
# TODO: Error handling
try:
    load_dotenv(find_dotenv(usecwd=True))
    base_url = os.environ.get('BASE_URL')
    results_per_page = int(os.environ.get('RESULTS_PER_PAGE'))
except:
    print "ERROR: Make sure you have .env file with proper config"
    sys.exit(1)
 
user_agents = list()
 
# results from the search engine
# basically include url, title,content
 
 
class SearchResult:
    def __init__(self):
        self.url = ''
        self.title = ''
        self.content = ''
 
    def getURL(self):
        return self.url
 
    def setURL(self, url):
        self.url = url
 
    def getTitle(self):
        return self.title
 
    def setTitle(self, title):
        self.title = title
 
    def getContent(self):
        return self.content
 
    def setContent(self, content):
        self.content = content
 
    def printIt(self, prefix=''):
        print 'url\t->', self.url
        print 'title\t->', self.title
        print 'content\t->', self.content
        print
 
    def writeFile(self, filename):
        file = open(filename, 'a')
        try:
            file.write('url:' + self.url + '\n')
            file.write('title:' + self.title + '\n')
            file.write('content:' + self.content + '\n\n')
        except IOError, e:
            print 'file error:', e
        finally:
            file.close()
 
 
class GoogleAPI:
    def __init__(self):
        timeout = 40
        socket.setdefaulttimeout(timeout)
 
    def randomSleep(self):
        sleeptime = random.randint(60, 120)
        time.sleep(sleeptime)
 
    def extractDomain(self, url):
        """Return string
        extract the domain of a url
        """
        domain = ''
        pattern = re.compile(r'http[s]?://([^/]+)/', re.U | re.M)
        url_match = pattern.search(url)
        if(url_match and url_match.lastindex > 0):
            domain = url_match.group(1)
 
        return domain
 
    def extractUrl(self, href):
        """ Return a string
        extract a url from a link
        """
        url = ''
        pattern = re.compile(r'(http[s]?://[^&]+)&', re.U | re.M)
        url_match = pattern.search(href)
        if(url_match and url_match.lastindex > 0):
            url = url_match.group(1)
 
        return url
 
    def extractSearchResults(self, html):
        """Return a list
        extract serach results list from downloaded html file
        """
        results = list()
        soup = BeautifulSoup(html, 'html.parser')
        div = soup.find('div', id='search')
        if (type(div) != types.NoneType):
            lis = div.findAll('div', {'class': 'g'})
            if(len(lis) > 0):
                for li in lis:
                    result = SearchResult()
                    h3 = li.find('h3', {'class': 'r'})
                    if(type(h3) == types.NoneType):
                        continue
                    # extract domain and title from h3 object
                    link = h3.find('a')
                    if (type(link) == types.NoneType):
                        continue
                    url = link['href']
                    url = self.extractUrl(url)
                    if(cmp(url, '') == 0):
                        continue
                    title = link.renderContents()
                    title = re.sub(r'<.+?>', '', title)
                    result.setURL(url)
                    result.setTitle(title)
                    span = li.find('span', {'class': 'st'})
                    if (type(span) != types.NoneType):
                        content = span.renderContents()
                        content = re.sub(r'<.+?>', '', content)
                        result.setContent(content)
                    results.append(result)
        return results
 
    def search(self, query, lang='en', num=results_per_page):
        """Return a list of lists
        search web
        @param query -> query key words
        @param lang -> language of search results
        @param num -> number of search results to return
        """
        search_results = list()
        query = urllib2.quote(query)
        if(num % results_per_page == 0):
            pages = num / results_per_page
        else:
            pages = num / results_per_page + 1
 
        for p in range(0, pages):
            start = p * results_per_page
            url = '%s/search?hl=%s&num=%d&start=%s&q=%s' % (
                base_url, lang, results_per_page, start, query)
            retry = 3
            while(retry > 0):
                try:
                    request = urllib2.Request(url)
                    length = len(user_agents)
                    index = random.randint(0, length-1)
                    user_agent = user_agents[index]
                    request.add_header('User-agent', user_agent)
                    request.add_header('connection', 'keep-alive')
                    request.add_header('Accept-Encoding', 'gzip')
                    request.add_header('referer', base_url)
                    response = urllib2.urlopen(request)
                    html = response.read()
                    if(response.headers.get('content-encoding', None) == 'gzip'):
                        html = gzip.GzipFile(
                            fileobj=StringIO.StringIO(html)).read()
 
                    results = self.extractSearchResults(html)
                    search_results.extend(results)
                    break
                except urllib2.URLError, e:
                    print 'url error:', e
                    self.randomSleep()
                    retry = retry - 1
                    continue
 
                except Exception, e:
                    print 'error:', e
                    retry = retry - 1
                    self.randomSleep()
                    continue
        return search_results
 
 
def load_user_agent():
    fp = open('./user_agents', 'r')
 
    line = fp.readline().strip('\n')
    while(line):
        user_agents.append(line)
        line = fp.readline().strip('\n')
    fp.close()
 
 
def crawler():
    # Load use agent string from file
    load_user_agent()
 
    # Create a GoogleAPI instance
    api = GoogleAPI()
 
    # set expect search results to be crawled
    expect_num = 10
    # if no parameters, read query keywords from file
    if(len(sys.argv) < 2):
        keywords = open('./keywords', 'r')
        keyword = keywords.readline()
        while(keyword):
            results = api.search(keyword, num=expect_num)
            for r in results:
                r.printIt()
            keyword = keywords.readline()
        keywords.close()
    else:
        keyword = sys.argv[1]
        results = api.search(keyword, num=expect_num)
        for r in results:
            r.printIt()
 
 
if __name__ == '__main__':
    crawler()