python搭建簡單爬蟲框架,爬取獵聘網的招聘職位資訊
阿新 • • 發佈:2019-01-26
該專案將主要有五個部分負責完成爬取任務,分別是:URL管理器,HTML下載器,HTML解析器,資料儲存器,爬蟲排程器。
具體程式碼如下:
URL管理器:
import hashlib import pickle import time class UrlManager(object): def __init__(self): self.new_urls = set() self.old_urls = set() self.error_urls = set() def get_new_url(self): """ 從容器中獲取新的url,並且轉化成md5減少記憶體消耗加進old_urls :return: """ new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url.encode('utf-8')) md5_url = m.hexdigest() self.old_urls.add(md5_url) return new_url def old_urls_size(self): return len(self.old_urls) def new_urls_size(self): return len(self.new_urls) def add_new_url(self,url): """ 新增單個url :param url: :return: """ if url is None: print('url is None!') m = hashlib.md5() m.update(url.encode('utf-8')) md5_url = m.hexdigest() if md5_url not in self.old_urls and url not in self.new_urls: self.new_urls.add(url) def add_new_urls(self,urls): """ 新增多個url,urls是個可迭代物件 :param urls: :return: """ if urls is None: print('urls is None!') for url in urls: self.add_new_url(url) def add_error_urls(self,url): """ 裝進響應錯誤的urls中 :param url: :return: """ return self.error_urls.add(url) def save_progress(self,path,data): """ 儲存進度 :return: """ with open(path,'wb') as f: pickle.dump(data,f) def load_progress(self,path): ''' 從本地檔案載入進度 :return: 返回set()集合 ''' try: with open(path, 'rb') as f: tmp = pickle.load(f) print('繼續%s的程序' % path) return tmp except FileNotFoundError as e: print(e,'無進度檔案,建立:%s'%path) return set()
此URL管理器具有去重的功能,爬取過的url不會重複爬取,並且使用了md5技術減少記憶體的消耗。
HTML下載器:
import requests import random from URLManager import UrlManager class HtmlDownloader(object): def __init__(self): self.url_manager = UrlManager() USER_AGENT = random.choice([ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1" "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1" ]) self.headers = {'User-Agent':USER_AGENT} def downloader(self,url): response = requests.get(url,headers=self.headers) response.encoding = 'utf-8' if response.status_code in [int('20'+str(x)) for x in range(10)]: return response else: self.url_manager.add_error_urls(url) print('response.status_code is %d:%s'%(response.status_code,url)) return response
HTML下載器使用了user-agent代理反爬技術,在五個user-agent代理中任意切換,提高反爬效能。
HTML解析器:
from lxml import etree from collections import defaultdict from urllib.parse import urljoin class HtmlParser(object): def __init__(self): self.data = defaultdict(list) def parser(self,response): ''' 解析資料 :param response: :return: ''' try: company_info = self.parse_company_1(response) except Exception as e: print(e,' ; parse_company_1解析錯誤,嘗試使用parse_company_2方法解析...') company_info = self.parse_company_2(response) job_requests = self.parse_job_info(response) self.data['job_info'] = job_requests self.data['company_info'] = company_info return self.data def parse_job_urls(self,response): ''' 獲取職位的連線 :param response: :return: ''' html = etree.HTML(response.text) links = html.xpath('//div[@class="job-info"]/h3[@title]/a/@href') job_urls = [] for link in links: if link.find('www.liepin.com') != -1: job_urls.append(link) else: link = self.url_join(response.url,link) job_urls.append(link) return job_urls def url_join(self,base_url,url): ''' 獲取絕對url :param base_url: :param url: :return: ''' abs_url = urljoin(base_url,url) return abs_url def parse_next_page(self,response): ''' 解析下一頁的連結 :param response: :return: ''' html = etree.HTML(response.text) next_page = html.xpath('//a[contains(.,"下一頁") and contains(@href,"zhaopin")]/@href') if next_page: abs_url = self.url_join(response.url,next_page[0]) return abs_url else: return None def parse_company_1(self,response): """ 獲取職位對應公司的資訊 :param response: :return: """ data = {} html = etree.HTML(response.text) company_info = html.xpath('//div[@class="new-compwrap"]')[0] company_url = company_info.xpath('.//p/a/@href')[0] company_basic_info = company_info.xpath('string(.//ul)') company_introduction = html.xpath('string(//div[@class="info-word"])') data['company_url'] = company_url data['company_basic_info'] = company_basic_info data['company_introduction'] = company_introduction return data def parse_company_2(self,response): """ 獲取職位對應公司的資訊 :param response: :return: """ data = {} html = etree.HTML(response.text) company_basic_info = html.xpath('string(//h3[contains(.,"其他資訊")]/following-sibling::div[@class="content content-word"])') company_introduction = html.xpath('string(//h3[contains(.,"企業介紹")]/following-sibling::div[@class]/div[1])') data['company_url'] = 'None' data['company_basic_info'] = company_basic_info data['company_introduction'] = company_introduction return data def parse_job_info(self,response): """ 獲取職位的資訊 :param response: :return: """ data = {} html = etree.HTML(response.text) job_title = html.xpath('//div[contains(@class,"title-info")]/h1[@title]/@title')[0] job_basic_info = html.xpath('string(//div[@class="job-title-left"])') job_description = html.xpath('string(//h3[contains(.,"職位描述")]/following-sibling::div[@class="content content-word"])') data['job_url'] = response.url data['job_title'] = job_title data['job_basic_info'] = job_basic_info data['job_description'] = job_description return data
HTML解析器使用了xpath來提取資料,可以應付兩種不同的網頁資訊提取。
資料儲存器:
import json
import pymongo
import time
class DataOutput(object):
def output_html_headers(self,path):
with open(path,'a+',encoding='utf-8') as f:
f.write('<html>\n<head>\n<title>獵聘python招聘資訊</title>\n<meta charset="UTF-8">\n</head>\n')
f.write('<body>\n<table width="960" align="center" border="1" rules="all" cellpadding="15">\n')
f.write('<tr bgcolor="# ccc">\n<th>%s</th>\n<th>%s</th>\n<th>%s</th>\n<th>%s</th>\n'%('job_url','job_title','job_basic_info','job_description'))
f.write('<th>%s</th>\n<th>%s</th>\n<th>%s</th>\n</tr>\n'%('company_url','company_basic_info','company_introduction'))
def output_html(self,data,path):
self.clean_data(data)
with open(path,'a+',encoding='utf-8') as f:
f.write('<tr align="center">\n')
f.write('<td><a href="{0}" target="_blank">{0}</a></td>\n'.format(data["job_info"]["job_url"]))
f.write('<td>%s</td>\n' % data["job_info"]["job_title"])
f.write('<td>%s</td>\n' % data["job_info"]["job_basic_info"])
f.write('<td>%s</td>\n' % data["job_info"]["job_description"])
f.write('<td><a href="{0}" target="_blank">{0}</a></td>\n'.format(data["company_info"]["company_url"]))
f.write('<td>%s</td>\n' % data["company_info"]["company_basic_info"])
f.write('<td>%s</td>\n' % data["company_info"]["company_introduction"])
f.write('</tr>\n')
def output_html_end(self,path):
with open(path,'a+',encoding='utf-8') as f:
f.write('</table>\n</body>\n</html>\n')
def open_mongodb(self):
self.client = pymongo.MongoClient('localhost:27017')
self.db = self.client['lieping_job']
def close_mongdb(self):
self.client.close()
def output_mongodb(self,data,collection):
data = self.clean_data(data)
data['_id'] = time.time()
self.db[collection].insert(data if isinstance(data,dict) else dict(data))
def output_json_start(self,path):
with open(path,'w',encoding='utf-8') as f:
f.write('[""')
def output_json(self,data,path):
'''
儲存為json格式
:param data:
:return:
'''
data = self.clean_data(data)
with open(path,'a',encoding='utf-8') as f:
f.write(',\n')
json.dump(data if isinstance(data,dict) else dict(data),f,indent=4)
def output_json_end(self,path):
with open(path, 'a', encoding='utf-8') as f:
f.write(']')
def output_text(self,data,path):
'''
以txt的格式儲存
:param data:
:return:
'''
with open(path,'a',encoding='utf-8') as f:
clean_data = self.clean_data(data)
f.write(str(clean_data)+'\n')
def clean_data(self,data):
'''
處理爬取下來的資料
:param data:
:return:
'''
company_introduction = data['company_info']['company_introduction']
data['company_info']['company_introduction'] = company_introduction.replace('\r\n','').replace(' ','').replace('\xa0','')
new_compintro = data['company_info']['company_basic_info']
data['company_info']['company_basic_info'] = new_compintro.replace('\r', '').replace('\n', '').replace('\t', '').replace(' ','')
job_item = data['job_info']['job_basic_info']
data['job_info']['job_basic_info'] = job_item.replace('\r\n', '').replace(' ', '')
job_info = data['job_info']['job_description']
data['job_info']['job_description'] = job_info.replace('\r\n', '').replace(' ', '')
return data
資料儲存器具有清洗資料的功能,使用python的replace字串處理方法,可以儲存為HTML,MongoDB,json,txt。
排程器:
import os
import time
from URLManager import UrlManager
from HTMLDownloader import HtmlDownloader
from HTMLParser import HtmlParser
from DATAOutput import DataOutput
class Crawl(object):
def __init__(self):
self.html_path = 'html_data%s.html' % str(time.time()).split('.')[0]
self.json_path = 'json_data%s.json' % str(time.time()).split('.')[0]
self.txt_path = 'text_data%s.txt' % str(time.time()).split('.')[0]
self.collection = 'python%s' % str(time.time()).split('.')[0]
self.page_num = 0
self.max_page_num = 21
self.url_manager = UrlManager()
self.html_downloader = HtmlDownloader()
self.html_parser = HtmlParser()
self.data_output = DataOutput()
def crawl_job_urls(self,base_url):
'''
爬取職位連結
:param base_url:
:return:
'''
response = self.html_downloader.downloader(base_url)
job_links = self.html_parser.parse_job_urls(response)
self.url_manager.add_new_urls(job_links)
next_page = self.html_parser.parse_next_page(response)
# 控制爬取頁數
while next_page and self.page_num < self.max_page_num:
try:
next_page = self.html_parser.url_join(response.url,next_page)
print('抓取第%d頁的職位連結'%(self.page_num+1))
r = self.html_downloader.downloader(next_page)
job_links = self.html_parser.parse_job_urls(r)
self.url_manager.add_new_urls(job_links)
next_page = self.html_parser.parse_next_page(r)
self.page_num += 1
except Exception as e:
self.url_manager.add_error_urls(r.url)
print(e)
def crawl_info(self):
'''
爬取職位相應資訊
:return:
'''
# 開啟資料儲存
#self.data_output.open_mongodb()
#self.data_output.output_json_start(self.json_path)
self.data_output.output_html_headers(self.html_path)
while self.url_manager.new_urls_size()!=0:
try:
new_url = self.url_manager.get_new_url()
print('正在解析第%d個job_url:%s'%(self.url_manager.old_urls_size(),new_url))
response = self.html_downloader.downloader(new_url)
data = self.html_parser.parser(response)
# 儲存為HTML格式
self.data_output.output_html(data,self.html_path)
# 儲存為txt格式
#self.data_output.output_text(data,self.txt_path)
# 儲存為json格式
#self.data_output.output_json(data,self.json_path)
# 儲存在MongoDB
#self.data_output.output_mongodb(data,self.collection)
except Exception as e:
self.url_manager.add_error_urls(response.url)
print(e)
# 儲存爬取進度
self.url_manager.save_progress('python_job_old_urls.txt', self.url_manager.old_urls)
if self.url_manager.error_urls != 0:
self.url_manager.save_progress('python_job_error_urls.txt',
self.url_manager.error_urls)
# 關閉資料儲存
self.data_output.output_html_end(self.html_path)
#self.data_output.output_json_end(self.json_path)
#self.data_output.close_mongdb()
print('crawl is over!')
if __name__ == '__main__':
crawl = Crawl()
base_url = 'https://www.liepin.com/zhaopin/?sfrom=click-pc_homepage-centre_searchbox-search_new&d_sfrom=search_fp&key=python'
# 是否為繼續爬取的狀態
if os.path.exists('python_job_old_urls.txt'):
old_urls = crawl.url_manager.load_progress('python_job_old_urls.txt')
crawl.url_manager.old_urls = old_urls
error_urls = crawl.url_manager.load_progress('python_job_error_urls.txt')
crawl.url_manager.error_urls = error_urls
crawl.crawl_job_urls(base_url)
crawl.crawl_info()
排程器是五個部分中最重要的一部分,協調配合其他四個部分更好的工作執行,可以儲存爬取的進度,再一次啟動的時候不會重複爬取之前爬取過的網頁。
此專案有利於對爬蟲框架的理解,希望能對你們有幫助,謝謝!