使用Python抓取google搜尋結果
1. 搜尋引擎的選取
選擇一個好的搜尋引擎意味著你能夠得到更準確的搜尋結果。我用過的搜尋引擎有四種:Google、Bing、Baidu、Yahoo!。 作為程式設計師,我首選Google。但當我看見我最愛的Google返回給我的全是一堆的js程式碼,根本沒我想要的搜尋結果。於是我轉而投向了Bing的陣營,在用過一段時間後我發現Bing返回的搜尋結果對於我的問題來說不太理想。正當我要絕望時,Google拯救了我。原來Google為了照顧那些禁止瀏覽器使用js的使用者,還有另外一種搜尋方式,請看下面的搜尋URL:
https://www.google.com.hk/search?hl=en&q=hello
hl指定要搜尋的語言,q就是你要搜尋的關鍵字。 好了,感謝Google,搜尋結果頁面包含我要抓取的內容。
PS: 網上很多利用python抓取Google搜尋結果還是利用https://ajax.googleapis.com/ajax/services/search/web... 的方法。需要注意的是這個方法Google已經不再推薦使用了,見https://developers.google.com/web-search/docs/。Google現在提供了Custom Search API, 不過API限制每天100次請求,如果需要更多則只能花錢買。
2. Python抓取並分析網頁
利用Python抓取網頁很方便,不多說,見程式碼:
def search(self, queryStr):
queryStr = urllib2.quote(queryStr)
url = 'https://www.google.com.hk/search?hl=en&q=%s' % queryStr
request = urllib2.Request(url)
response = urllib2.urlopen(request)
html = response.read()
results = self.extractSearchResults(html)
第6行的 html 就是我們抓取的搜尋結果頁面原始碼。使用過Python的同學會發現,Python同時提供了urllib 和 urllib2兩個模組,都是和URL請求相關的模組,不過提供了不同的功能,urllib只可以接收URL,而urllib2可以接受一個Request類的例項來設定URL請求的headers,這意味著你可以偽裝你的user agent 等(下面會用到)。
現在我們已經可以用Python抓取網頁並儲存下來,接下來我們就可以從原始碼頁面中抽取我們想要的搜尋結果。Python提供了htmlparser模組,不過用起來相對比較麻煩,這裡推薦一個很好用的網頁分析包BeautifulSoup,關於BeautifulSoup的用法官網有詳細的介紹,這裡我不再多說。
利用上面的程式碼,對於少量的查詢還比較OK,但如果要進行上千上萬次的查詢,上面的方法就不再有效了, Google會檢測你請求的來源,如果我們利用機器頻繁爬取Google的搜尋結果,不多久就Google會block你的IP,並給你返回503 Error頁面。這不是我們想要的結果,於是我們還要繼續探索
前面提到利用urllib2我們可以設定URL請求的headers, 偽裝我們的user agent。簡單的說,user agent就是客戶端瀏覽器等應用程式使用的一種特殊的網路協議, 在每次瀏覽器(郵件客戶端/搜尋引擎蜘蛛)進行 HTTP 請求時傳送到伺服器,伺服器就知道了使用者是使用什麼瀏覽器(郵件客戶端/搜尋引擎蜘蛛)來訪問的。 有時候為了達到一些目的,我們不得不去善意的欺騙伺服器告訴它我不是在用機器訪問你。
於是,我們的程式碼就成了下面這個樣子:
user_agents = ['Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20130406 Firefox/23.0', \
'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:18.0) Gecko/20100101 Firefox/18.0', \
'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/533+ \
(KHTML, like Gecko) Element Browser 5.0', \
'IBM WebExplorer /v0.94', 'Galaxy/1.0 [en] (Mac OS X 10.5.6; U; en)', \
'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)', \
'Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14', \
'Mozilla/5.0 (iPad; CPU OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) \
Version/6.0 Mobile/10A5355d Safari/8536.25', \
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/28.0.1468.0 Safari/537.36', \
'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0; TheWorld)']
def search(self, queryStr):
queryStr = urllib2.quote(queryStr)
url = 'https://www.google.com.hk/search?hl=en&q=%s' % queryStr
request = urllib2.Request(url)
index = random.randint(0, 9)
user_agent = user_agents[index]
request.add_header('User-agent', user_agent)
response = urllib2.urlopen(request)
html = response.read()
results = self.extractSearchResults(html)
不要被user_agents那個list嚇到,那其實就是10個user agent 字串,這麼做是讓我們偽裝的更好一些,如果你需要更多的user agent 請看這裡 UserAgentString。
17-19行表示隨機選擇一個user agent 字串,然後用request 的add_header方法偽裝一個user agent。
通過偽裝user agent能夠讓我們持續抓取搜尋引擎結果,如果這樣還不行,那我建議在每兩次查詢間隨機休眠一段時間,這樣會影響抓取速度,但是能夠讓你更持續的抓取結果,如果你有多個IP,那抓取的速度也就上來了。
import sys
import os
import urllib2
import socket
import time
import gzip
import StringIO
import re
import random
import types
from dotenv import load_dotenv, find_dotenv
from bs4 import BeautifulSoup
reload(sys)
sys.setdefaultencoding('utf-8')
# Load config from .env file
# TODO: Error handling
try:
load_dotenv(find_dotenv(usecwd=True))
base_url = os.environ.get('BASE_URL')
results_per_page = int(os.environ.get('RESULTS_PER_PAGE'))
except:
print "ERROR: Make sure you have .env file with proper config"
sys.exit(1)
user_agents = list()
# results from the search engine
# basically include url, title,content
class SearchResult:
def __init__(self):
self.url = ''
self.title = ''
self.content = ''
def getURL(self):
return self.url
def setURL(self, url):
self.url = url
def getTitle(self):
return self.title
def setTitle(self, title):
self.title = title
def getContent(self):
return self.content
def setContent(self, content):
self.content = content
def printIt(self, prefix=''):
print 'url\t->', self.url
print 'title\t->', self.title
print 'content\t->', self.content
print
def writeFile(self, filename):
file = open(filename, 'a')
try:
file.write('url:' + self.url + '\n')
file.write('title:' + self.title + '\n')
file.write('content:' + self.content + '\n\n')
except IOError, e:
print 'file error:', e
finally:
file.close()
class GoogleAPI:
def __init__(self):
timeout = 40
socket.setdefaulttimeout(timeout)
def randomSleep(self):
sleeptime = random.randint(60, 120)
time.sleep(sleeptime)
def extractDomain(self, url):
"""Return string
extract the domain of a url
"""
domain = ''
pattern = re.compile(r'http[s]?://([^/]+)/', re.U | re.M)
url_match = pattern.search(url)
if(url_match and url_match.lastindex > 0):
domain = url_match.group(1)
return domain
def extractUrl(self, href):
""" Return a string
extract a url from a link
"""
url = ''
pattern = re.compile(r'(http[s]?://[^&]+)&', re.U | re.M)
url_match = pattern.search(href)
if(url_match and url_match.lastindex > 0):
url = url_match.group(1)
return url
def extractSearchResults(self, html):
"""Return a list
extract serach results list from downloaded html file
"""
results = list()
soup = BeautifulSoup(html, 'html.parser')
div = soup.find('div', id='search')
if (type(div) != types.NoneType):
lis = div.findAll('div', {'class': 'g'})
if(len(lis) > 0):
for li in lis:
result = SearchResult()
h3 = li.find('h3', {'class': 'r'})
if(type(h3) == types.NoneType):
continue
# extract domain and title from h3 object
link = h3.find('a')
if (type(link) == types.NoneType):
continue
url = link['href']
url = self.extractUrl(url)
if(cmp(url, '') == 0):
continue
title = link.renderContents()
title = re.sub(r'<.+?>', '', title)
result.setURL(url)
result.setTitle(title)
span = li.find('span', {'class': 'st'})
if (type(span) != types.NoneType):
content = span.renderContents()
content = re.sub(r'<.+?>', '', content)
result.setContent(content)
results.append(result)
return results
def search(self, query, lang='en', num=results_per_page):
"""Return a list of lists
search web
@param query -> query key words
@param lang -> language of search results
@param num -> number of search results to return
"""
search_results = list()
query = urllib2.quote(query)
if(num % results_per_page == 0):
pages = num / results_per_page
else:
pages = num / results_per_page + 1
for p in range(0, pages):
start = p * results_per_page
url = '%s/search?hl=%s&num=%d&start=%s&q=%s' % (
base_url, lang, results_per_page, start, query)
retry = 3
while(retry > 0):
try:
request = urllib2.Request(url)
length = len(user_agents)
index = random.randint(0, length-1)
user_agent = user_agents[index]
request.add_header('User-agent', user_agent)
request.add_header('connection', 'keep-alive')
request.add_header('Accept-Encoding', 'gzip')
request.add_header('referer', base_url)
response = urllib2.urlopen(request)
html = response.read()
if(response.headers.get('content-encoding', None) == 'gzip'):
html = gzip.GzipFile(
fileobj=StringIO.StringIO(html)).read()
results = self.extractSearchResults(html)
search_results.extend(results)
break
except urllib2.URLError, e:
print 'url error:', e
self.randomSleep()
retry = retry - 1
continue
except Exception, e:
print 'error:', e
retry = retry - 1
self.randomSleep()
continue
return search_results
def load_user_agent():
fp = open('./user_agents', 'r')
line = fp.readline().strip('\n')
while(line):
user_agents.append(line)
line = fp.readline().strip('\n')
fp.close()
def crawler():
# Load use agent string from file
load_user_agent()
# Create a GoogleAPI instance
api = GoogleAPI()
# set expect search results to be crawled
expect_num = 10
# if no parameters, read query keywords from file
if(len(sys.argv) < 2):
keywords = open('./keywords', 'r')
keyword = keywords.readline()
while(keyword):
results = api.search(keyword, num=expect_num)
for r in results:
r.printIt()
keyword = keywords.readline()
keywords.close()
else:
keyword = sys.argv[1]
results = api.search(keyword, num=expect_num)
for r in results:
r.printIt()
if __name__ == '__main__':
crawler()