簡單分布式爬蟲
阿新 • • 發佈:2017-10-24
size 下載 pat 關閉 and ict 一個 htm pid
# url管理器
# url管理器 import pickle import hashlib class UrlManager(): def __init__(self): self.new_urls = self.load_progress(‘new_urls.txt‘) # 未爬取url集合 self.old_urls = self.load_progress(‘old_urls.txt‘) # 已爬取集合 def has_new_url(self): ‘‘‘ 判斷是否有未爬取的URL :return:‘‘‘ return self.new_url_size() != 0 def get_new_url(self): ‘‘‘ 獲取一個未爬取的URL :return: ‘‘‘ new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url defadd_new_url(self, url): ‘‘‘ 將新的URL添加到未爬取的集合中 :param url: 單個URL :return: ‘‘‘ if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not inself.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): ‘‘‘ 將新的URL添加到未爬取的URL集合中 :param urls: URL集合 :return: ‘‘‘ if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def new_url_size(self): ‘‘‘ 獲取未爬取URL集合的大小 :return: ‘‘‘ return len(self.new_urls) def old_url_size(self): ‘‘‘ 獲取已經爬取的URL集合的大小 :return: ‘‘‘ return len(self.old_urls) def save_progress(self, path, data): ‘‘‘ 保存進度 :param path: 文件路徑 :param data: 數據 :return: ‘‘‘ with open(path, ‘wb‘) as f: pickle.dump(data, f) def load_progress(self, path): ‘‘‘ 從本地文件加載進度 :param path: 文件路徑 :return: 返回set集合 ‘‘‘ print(‘[+] 從文件加載進度:%s‘ % path) try: with open(path, ‘rb‘) as f: tmp = pickle.load(f) return tmp except: print(‘[!] 文件無效,創建:%s‘ % path) return set()
#爬蟲管理器 from multiprocessing.managers import BaseManager from .HTML_downloader import HtmlDownloader from .HTML_parser import HtmlParser class Spiderwork(): def __init__(self): BaseManager.register(‘get_task_queue‘) BaseManager.register(‘get_result_queue‘) server_addr = ‘127.0.0.1‘ print(‘connect to %s ....‘% server_addr) self.m = BaseManager(address=(server_addr,8001),authkey=‘baike‘.encode(‘utf-8‘)) self.m.connect() self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() self.downloader = HtmlDownloader() self.parser = HtmlParser() print(‘init finshed..‘) def crawl(self): while True: try: if not self.task.empty(): url = self.task.get() if url ==‘end‘: print(‘控制節點通知爬蟲節點停止工作。‘) self.result.put({‘new_urls‘:‘end‘,‘data‘:‘end‘}) return print(‘爬蟲節點正在解析:%s‘ % url.encode(‘utf-8‘)) content=self.downloader.download(url) new_urls,data=self.parser.parser(url,content) self.result.put({‘new_urls‘:url,‘data‘:data}) except EOFError as e : print(‘鏈接工作節點失敗‘) return except Exception as e : print(e) print(‘crawl fial‘) if __name__ ==‘__main__‘: spider = Spiderwork() spider.crawl()
# HTML解析器 import re from urllib import parse from bs4 import BeautifulSoup class HtmlParser(): def parser(self,page_url,html_cont): ‘‘‘ 用於解析網頁內容,抽取URL和數據 :param page_url: 下載頁面的URL :param html_cont: 下載的網頁內容 :return: ‘‘‘ if page_url is None or html_cont is None: return soup = BeautifulSoup(html_cont,‘html.parser‘) new_urls = self._get_new_urls(page_url,soup) new_data = self._get_new_data(page_url,soup) return new_urls,new_data def _get_new_urls(self,page_url,soup): ‘‘‘ 抽取新的URL集合 :param page_url: 下載頁面的URL :param soup: soup :return: ‘‘‘ new_urls = set() #抽取符合要求的a標簽 links = soup.find_all(‘a‘,href = re.compile(r‘/item/.‘)) for link in links: # 提取href屬性 new_url = link[‘href‘] # 拼接成完整的網址 new_full_url = parse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self,page_url,soup): ‘‘‘ 抽取有效數據 :param page_url: 下載頁面URL :param soup: :return: 返回有效數據 ‘‘‘ data = {} data[‘url‘] = page_url title = soup.find(‘dd‘,class_ = ‘lemmaWgt-lemmaTitle-title‘).find(‘h1‘) data[‘title‘] = title.text summary = soup.find(‘div‘,class_ = ‘lemma-summary‘) #獲取tag中包含的所有文本內容 data[‘summary‘] = summary.text return data
# HTML下載器 import requests class HtmlDownloader(): def download(self,url): if url is None: return None headers = {‘User-Agent‘: ‘Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0‘.encode(‘utf-8‘)} r = requests.get(url,headers=headers) if r.status_code ==200: r.encoding = ‘utf-8‘ return r.text return None
# 數據存儲器 import codecs import time class DataOutput(): def __init__(self): self.filepath = ‘baike_%s.html‘ % (time.strftime(‘%Y_%m_%d_%H_%M_%S‘, time.localtime())) self.output_head(self.filepath) self.datas = [] def store_data(self, data): if data is None: return self.datas.append(data) if len(self.datas) > 10: self.output_html(self.filepath) def output_head(self, path): ‘‘‘ 將HTML頭寫進去 :param path: :return: ‘‘‘ fout = codecs.open(path, ‘w‘, encoding=‘utf-8‘) fout.write(‘<html>‘) fout.write(‘<table>‘) fout.write(‘<table>‘) fout.close() def output_html(self, path): ‘‘‘ 將數據寫入HTML文件中 :return: ‘‘‘ fout = codecs.open(path, ‘a‘, encoding=‘utf-8‘) for data in self.datas: fout.write(‘<tr>‘) fout.write(‘<td>%s</td>‘ % data[‘url‘]) fout.write(‘<td>%s</td>‘ % data[‘title‘]) fout.write(‘<td>%s</td>‘ % data[‘summary‘]) fout.write(‘</tr>‘) self.datas.remove(data) fout.write(‘</table>‘) fout.write(‘</table>‘) fout.write(‘</html>‘) fout.close() def output_end(self, path): ‘‘‘ 將HTML尾寫進去 :param path: :return: ‘‘‘ fout = codecs.open(path, ‘a‘, encoding=‘utf-8‘) fout.write(‘</table>‘) fout.write(‘</table>‘) fout.write(‘</html>‘) fout.close()
#控制調度器 import random,time,queue from multiprocessing.managers import BaseManager from multiprocessing import Process from .URLManager import UrlManager from .Data_store import DataOutput class NodeManager(): def start_Manager(self,url_q,result_q): ‘‘‘ 創建一個分布式管理器 :param url_q: url隊列 :param result_q: 結果隊列 :return: ‘‘‘ BaseManager.register(‘get_task_queue‘, callable=lambda: url_q) BaseManager.register(‘get_result_queue‘, callable=lambda: result_q) manager = BaseManager(address=(‘‘, 8001), authkey=‘baike‘.encode(‘utf-8‘)) return manager def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_urls(root_url) while True: while (url_manager.has_new_url()): #從URL管理器獲取新的URL new_url = url_manager.get_new_url() # 將新URL發送給工作節點 url_q.put(new_url) print(‘old_url=‘,url_manager.old_url_size()) # 判斷,當爬取2000個鏈接後關閉並保存 if (url_manager.old_url_size()>2000): url_q.put(‘end‘) print(‘控制節點發起結束通知‘) # 關閉管理節點。同時存儲set狀態 url_manager.save_progress(‘new_urls.txt‘,url_manager.new_urls) url_manager.save_progress(‘old_urls.txt‘,url_manager.old_urls) return try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException as e: time.sleep(0.1) def result_solve_proc(self,result_q,conn_q,store_q): while True: try: if not result_q.empty(): content= result_q.get(True) if content[‘new_urls‘] ==‘end‘: #結果分析進程接受通知然後結束 print(‘結果分析進程接收通知然後結束‘) store_q.put(‘end‘) return conn_q.put(content[‘new_urls‘]) # url為set類型 store_q.put(content[‘data‘])#解析出來的數據為dict類型 else: time.sleep(0.1) except BaseException as e: time.sleep(0.1) def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data ==‘end‘: print(‘存儲進程接受通知然後結束‘) output.output_end(output.filepath) return output.store_data(data) else: time.sleep(0.1) if __name__ ==‘__main__‘: # 初始化4個隊列 url_q = queue.Queue() result_q = queue.Queue() store_q = queue.Queue() conn_q = queue.Queue() # 創建分布式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) # 創建URL管理進程,數據提取進程和數據存儲進程 url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,‘http://baike.baidu.com/view/284853.htm‘)) result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q)) store_proc = Process(target=node.store_proc,args=(store_q,)) # 啟動3個進程和分布式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()
簡單分布式爬蟲