1. 程式人生 > >簡單分布式爬蟲

簡單分布式爬蟲

size 下載 pat 關閉 and ict 一個 htm pid

# url管理器

# url管理器
import pickle
import hashlib


class UrlManager():
    def __init__(self):
        self.new_urls = self.load_progress(new_urls.txt)  # 未爬取url集合
        self.old_urls = self.load_progress(old_urls.txt)  # 已爬取集合

    def has_new_url(self):
        ‘‘‘
        判斷是否有未爬取的URL
        :return:
        
‘‘‘ return self.new_url_size() != 0 def get_new_url(self): ‘‘‘ 獲取一個未爬取的URL :return: ‘‘‘ new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def
add_new_url(self, url): ‘‘‘ 將新的URL添加到未爬取的集合中 :param url: 單個URL :return: ‘‘‘ if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in
self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): ‘‘‘ 將新的URL添加到未爬取的URL集合中 :param urls: URL集合 :return: ‘‘‘ if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def new_url_size(self): ‘‘‘ 獲取未爬取URL集合的大小 :return: ‘‘‘ return len(self.new_urls) def old_url_size(self): ‘‘‘ 獲取已經爬取的URL集合的大小 :return: ‘‘‘ return len(self.old_urls) def save_progress(self, path, data): ‘‘‘ 保存進度 :param path: 文件路徑 :param data: 數據 :return: ‘‘‘ with open(path, wb) as f: pickle.dump(data, f) def load_progress(self, path): ‘‘‘ 從本地文件加載進度 :param path: 文件路徑 :return: 返回set集合 ‘‘‘ print([+] 從文件加載進度:%s % path) try: with open(path, rb) as f: tmp = pickle.load(f) return tmp except: print([!] 文件無效,創建:%s % path) return set()
#爬蟲管理器
from multiprocessing.managers import BaseManager
from .HTML_downloader import HtmlDownloader
from .HTML_parser import HtmlParser
class Spiderwork():
    def __init__(self):
        BaseManager.register(get_task_queue)
        BaseManager.register(get_result_queue)
        server_addr = 127.0.0.1
        print(connect to %s ....% server_addr)
        self.m = BaseManager(address=(server_addr,8001),authkey=baike.encode(utf-8))
        self.m.connect()
        self.task = self.m.get_task_queue()
        self.result = self.m.get_result_queue()
        self.downloader = HtmlDownloader()
        self.parser = HtmlParser()
        print(init finshed..)

    def crawl(self):
        while True:
            try:
                if not self.task.empty():
                    url = self.task.get()
                    if url ==end:
                        print(控制節點通知爬蟲節點停止工作。)
                        self.result.put({new_urls:end,data:end})
                        return
                    print(爬蟲節點正在解析:%s % url.encode(utf-8))
                    content=self.downloader.download(url)
                    new_urls,data=self.parser.parser(url,content)
                    self.result.put({new_urls:url,data:data})
            except EOFError as e :
                print(鏈接工作節點失敗)
                return
            except Exception as e :
                print(e)
                print(crawl fial)

if __name__ ==__main__:
    spider = Spiderwork()
    spider.crawl()
# HTML解析器
import re
from urllib import parse
from bs4 import BeautifulSoup

class HtmlParser():

    def parser(self,page_url,html_cont):
        ‘‘‘
        用於解析網頁內容,抽取URL和數據
        :param page_url: 下載頁面的URL
        :param html_cont: 下載的網頁內容
        :return:
        ‘‘‘
        if page_url is None or html_cont is None:
            return
        soup = BeautifulSoup(html_cont,html.parser)
        new_urls = self._get_new_urls(page_url,soup)
        new_data = self._get_new_data(page_url,soup)

        return new_urls,new_data

    def _get_new_urls(self,page_url,soup):
        ‘‘‘
        抽取新的URL集合
        :param page_url: 下載頁面的URL
        :param soup: soup
        :return:
        ‘‘‘
        new_urls = set()
        #抽取符合要求的a標簽
        links = soup.find_all(a,href = re.compile(r/item/.))
        for link in links:
            # 提取href屬性
            new_url = link[href]
            # 拼接成完整的網址
            new_full_url = parse.urljoin(page_url,new_url)
            new_urls.add(new_full_url)

        return new_urls

    def _get_new_data(self,page_url,soup):
        ‘‘‘
        抽取有效數據
        :param page_url: 下載頁面URL
        :param soup:
        :return: 返回有效數據
        ‘‘‘
        data = {}
        data[url] = page_url
        title = soup.find(dd,class_ = lemmaWgt-lemmaTitle-title).find(h1)
        data[title] = title.text
        summary = soup.find(div,class_ = lemma-summary)
        #獲取tag中包含的所有文本內容
        data[summary] = summary.text
        return data
# HTML下載器
import requests

class HtmlDownloader():

    def download(self,url):
        if url is None:
            return None
        headers = {User-Agent: Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0.encode(utf-8)}
        r = requests.get(url,headers=headers)
        if r.status_code ==200:
            r.encoding = utf-8
            return r.text
        return None
# 數據存儲器
import codecs
import time


class DataOutput():
    def __init__(self):
        self.filepath = baike_%s.html % (time.strftime(%Y_%m_%d_%H_%M_%S, time.localtime()))
        self.output_head(self.filepath)
        self.datas = []

    def store_data(self, data):
        if data is None:
            return
        self.datas.append(data)
        if len(self.datas) > 10:
            self.output_html(self.filepath)

    def output_head(self, path):
        ‘‘‘
        將HTML頭寫進去
        :param path:
        :return:
        ‘‘‘
        fout = codecs.open(path, w, encoding=utf-8)
        fout.write(<html>)
        fout.write(<table>)
        fout.write(<table>)
        fout.close()

    def output_html(self, path):
        ‘‘‘
        將數據寫入HTML文件中
        :return:
        ‘‘‘
        fout = codecs.open(path, a, encoding=utf-8)
        for data in self.datas:
            fout.write(<tr>)
            fout.write(<td>%s</td> % data[url])
            fout.write(<td>%s</td> % data[title])
            fout.write(<td>%s</td> % data[summary])
            fout.write(</tr>)
            self.datas.remove(data)
        fout.write(</table>)
        fout.write(</table>)
        fout.write(</html>)
        fout.close()

    def output_end(self, path):
        ‘‘‘
        將HTML尾寫進去
        :param path:
        :return:
        ‘‘‘
        fout = codecs.open(path, a, encoding=utf-8)
        fout.write(</table>)
        fout.write(</table>)
        fout.write(</html>)
        fout.close()
#控制調度器
import random,time,queue
from multiprocessing.managers import BaseManager
from multiprocessing import Process
from .URLManager import UrlManager
from .Data_store import DataOutput


class NodeManager():

    def start_Manager(self,url_q,result_q):
        ‘‘‘
        創建一個分布式管理器
        :param url_q: url隊列
        :param result_q: 結果隊列
        :return:
        ‘‘‘
        BaseManager.register(get_task_queue, callable=lambda: url_q)

        BaseManager.register(get_result_queue, callable=lambda: result_q)

        manager = BaseManager(address=(‘‘, 8001), authkey=baike.encode(utf-8))

        return manager

    def url_manager_proc(self,url_q,conn_q,root_url):
        url_manager = UrlManager()
        url_manager.add_new_urls(root_url)
        while True:
            while (url_manager.has_new_url()):
                #從URL管理器獲取新的URL
                new_url = url_manager.get_new_url()
                # 將新URL發送給工作節點
                url_q.put(new_url)
                print(old_url=,url_manager.old_url_size())
                # 判斷,當爬取2000個鏈接後關閉並保存
                if (url_manager.old_url_size()>2000):
                    url_q.put(end)
                    print(控制節點發起結束通知)
                    # 關閉管理節點。同時存儲set狀態
                    url_manager.save_progress(new_urls.txt,url_manager.new_urls)
                    url_manager.save_progress(old_urls.txt,url_manager.old_urls)
                    return
            try:
                if not conn_q.empty():
                    urls = conn_q.get()
                    url_manager.add_new_urls(urls)
            except BaseException as e:
                time.sleep(0.1)

    def result_solve_proc(self,result_q,conn_q,store_q):
        while True:
            try:
                if not result_q.empty():
                    content= result_q.get(True)
                    if content[new_urls] ==end:
                        #結果分析進程接受通知然後結束
                        print(結果分析進程接收通知然後結束)
                        store_q.put(end)
                        return
                    conn_q.put(content[new_urls]) # url為set類型
                    store_q.put(content[data])#解析出來的數據為dict類型
                else:
                    time.sleep(0.1)
            except BaseException as e:
                time.sleep(0.1)

    def store_proc(self,store_q):
        output = DataOutput()
        while True:
            if not store_q.empty():
                data = store_q.get()
                if data ==end:
                    print(存儲進程接受通知然後結束)
                    output.output_end(output.filepath)
                    return
                output.store_data(data)
            else:
                time.sleep(0.1)
if __name__ ==__main__:
    # 初始化4個隊列
    url_q = queue.Queue()
    result_q = queue.Queue()
    store_q = queue.Queue()
    conn_q = queue.Queue()
    # 創建分布式管理器
    node = NodeManager()
    manager = node.start_Manager(url_q,result_q)
    # 創建URL管理進程,數據提取進程和數據存儲進程
    url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,http://baike.baidu.com/view/284853.htm))
    result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q))
    store_proc = Process(target=node.store_proc,args=(store_q,))
    # 啟動3個進程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()

簡單分布式爬蟲