1. 程式人生 > >多線程爬蟲案例

多線程爬蟲案例

阻塞 dna out web 引用 意圖 一個數 adding tro

多線程糗事百科案例

案例要求參考上一個糗事百科單進程案例

Queue(隊列對象)

Queue是python中的標準庫,可以直接import Queue引用;隊列是線程間最常用的交換數據的形式

python下多線程的思考

對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是線程安全的,因此在滿足使用條件下,建議使用隊列

  1. 初始化: class Queue.Queue(maxsize) FIFO 先進先出

  2. 包中的常用方法:

    • Queue.qsize() 返回隊列的大小

    • Queue.empty() 如果隊列為空,返回True,反之False

    • Queue.full() 如果隊列滿了,返回True,反之False

    • Queue.full 與 maxsize 大小對應

    • Queue.get([block[, timeout]])獲取隊列,timeout等待時間

  3. 創建一個“隊列”對象

    • import Queue
    • myqueue = Queue.Queue(maxsize = 10)
  4. 將一個值放入隊列中

    • myqueue.put(10)
  5. 將一個值從隊列中取出

    • myqueue.get()

多線程示意圖

技術分享圖片

import threading
from queue import Queue
from lxml import etree
import requests
import json
import time


class ThreadCrawl(threading.Thread):
    def __init__(self, threadName, pageQueue, dataQueue):
        # 調用父類的初始化方法
        # threading.Thread.__init__(self)
        super(ThreadCrawl, self).__init__()
        # 線程的名字
        self.threadName = threadName
        # 頁碼隊列
        self.pageQueue = pageQueue
        # 數據隊列
        self.dataQueue = dataQueue
        # 請求報頭
        self.headers = {
            "Uaer-Agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
        }

    def run(self):
        print(‘啟動:{}‘.format(self.threadName))
        while not CRAWL_EXIT:
            try:
                # 取出隊列中的一個數字, 先進先出
                # 可選參數block, 默認值為True, 兩種用法
                # 1. 如果隊列為空, block為True的話, 不會結束, 就會進入阻塞狀態, 知道隊列有新的數據
                # 2. 如果隊列為空, block為False的話, 就會彈出一個Queue.empty()異常
                page = self.pageQueue.get(False)
                url = ‘http://www.qiushibaike.com/8hr/page/{}/‘.format(page)
                content = requests.get(url, headers=self.headers).text
                # content = content.decode(‘utf-8‘)
                # print(content)
                time.sleep(1)
                self.dataQueue.put(content)
            except:
                pass
        print(‘結束:{}‘.format(self.threadName))


class ThreadParse(threading.Thread):
    def __init__(self, threadName, dataQueue, filename, lock):
        super(ThreadParse, self).__init__()
        # 線程的名字
        self.threadName = threadName
        # 數據隊列
        self.dataQueue = dataQueue
        # 保存解析後數據的文件名
        self.filename = filename
        # 鎖
        self.lock = lock

    def run(self):
        print(‘啟動:{}‘.format(self.threadName))
        while not PARSE_EXIT:
            try:
                html = self.dataQueue.get(False)
                self.parse(html)
            except:
                pass
        print(‘結束:{}‘.format(self.threadName))

    def parse(self, html):
        # 解析為HTML DOM
        html = etree.HTML(html)
        node_list = html.xpath(‘//div[contains(@class, "article block untagged mb15")]‘)
        for node in node_list:
            # print(node)
            # xpath返回的列表,這個列表就這一個參數,用索引方式取出來,用戶名
            # .//h2  用戶名
            username = node.xpath(‘.//h2‘)[0].text.strip()
            # print(‘username==={}‘.format(username))
            # .//div[@class="thumb"]//@src  圖片連接
            image = node.xpath(‘.//div[@class="thumb"]//@src‘)
            # print(‘image==={}‘.format(image))
            # .//div[@class="content"]/span    取出標簽下的內容,段子內容
            content = node.xpath(‘.//div[@class="content"]/span‘)[0].text.strip()
            # print(‘content==={}‘.format(content))
            # .//i[@class="number"][0] 點贊  取出標簽裏包含的內容,點贊
            zan = node.xpath(‘.//i[@class="number"]‘)[0].text
            # print(‘zan==={}‘.format(zan))
            # .//i[@class="number"][i] 評論
            comments = node.xpath(‘.//i[@class="number"]‘)[1].text
            # print(‘comments==={}‘.format(comments))
            items = {
                "username" : username,
                "image" : image,
                "content" : content,
                "zan" : zan,
                "comments" : comments
            }

            # with 後面有兩個必須執行的操作:__enter__ 和 _exit__
            # 不管裏面的操作結果如何,都會執行打開、關閉
            # 打開鎖、處理內容、釋放鎖
            # print("正在寫入內容!!!")
            with self.lock:
                # print("正在寫入內容!!!")
                # # 寫入存儲的解析後的數據
                # json_data = json.dumps(items, ensure_ascii=False)
                # print(‘jsondata==={}‘.format(json_data))
                # self.filename.write(json_data.encode("utf-8") + "\n")
                self.filename.write(str(json.dumps(items, ensure_ascii = False))+‘\n‘)
                print("寫入完成!!!")


# 采集爬蟲退出信號
CRAWL_EXIT = False
# 解析爬蟲退出信號
PARSE_EXIT = False


def main():
    # 頁碼的隊列, 表示10個頁面
    pageQueue = Queue(10)
    # 放入1~10個數字, 先進先出
    for i in range(1, 11):
        pageQueue.put(i)

    # 采集結果(每頁的html源碼)的數據隊列, 參數為空表示不限制
    dataQueue = Queue()

    filename = open(‘duanzi.json‘, ‘a‘, encoding=‘utf-8‘)
    print("打開文件!!!")

    # 創建鎖
    lock = threading.Lock()

    # 三個采集線程的名字
    crawlList = ["采集線程1號", "采集線程2號", "采集線程3號"]

    # 存儲三個采集線程的列表集合
    threadcrawl = []

    for threadName in crawlList:
        thread = ThreadCrawl(threadName, pageQueue, dataQueue)
        thread.start()
        threadcrawl.append(thread)
        time.sleep(1)

    # 三個解析線程的名字
    parseList = ["解析線程1號", "解析線程2號", "解析線程3號"]

    # 存儲三個解析線程
    threadparse = []

    for threadName in parseList:
        thread = ThreadParse(threadName, dataQueue, filename, lock)
        thread.start()
        threadparse.append(thread)

    # 等待pageQueue隊列為空, 等待之前的操作執行完畢
    while not pageQueue.empty():
        pass

    # 如果pageQueue為空, 采集線程退出循環
    global CRAWL_EXIT
    CRAWL_EXIT = True

    print(‘pageQueue隊列為空‘)

    for thread in threadcrawl:
        thread.join()
        print("1")

    while not dataQueue.empty():
        pass

    global PARSE_EXIT
    PARSE_EXIT = True

    for thread in threadparse:
        thread.join()
        print("2")

    with lock:
        print("關閉文件!!!")
        # 關閉文件
        filename.close()
    print("謝謝使用")


if __name__ == ‘__main__‘:
    main()

  

多線程爬蟲案例