1. 程式人生 > 其它 >Python多執行緒實戰

Python多執行緒實戰

目錄
專案的原始碼在GitHub

建立模組

第一步我們可以自己封裝一個模組用來進行多執行緒爬蟲

from threading import Thread  # 繼承Thread方法,重寫run方法


class Spider(Thread):  # 同時重寫多執行緒的run函式

    def __init__(self, url, target) -> "初始化變數":
        super().__init__(target=target, daemon=True)  # daemon執行緒等待,target是執行的函式
        self.target = target

        # # 例項化redis資料庫,如果要使用,請安裝相應外掛後取消註釋
        # import redis
        # self.redis = redis.Redis()  # 在後面的使用中可以進行去重,防止重複下載

        # 構建ip池,防止由於訪問速度過快,使得ip地址被封
        self.file = open("../content/ip.txt")  # 得到大量ip地址
        self.ipList = self.file.readlines() 
        self.file.close()
        from random import choice
        self.ip = choice(self.ipList).strip()

        # # 例項化mongo資料庫
        # import pymongo
        # self.mongo = pymongo.MongoClient()
        # self.clo = self.mongo["python"]["default"]  # 還要說明MongoDB使用的表名

        # 傳入requests所需要的引數
        self.headers = {
            'User-Agent': "Mozilla / 5.0(Windows NT 10.0;Win64;x64) AppleWebKit / 537.36(KHTML, likeGecko) Chrome / 96.0.4664 .93 Safari / 537.36"  # UA偽裝
        }
        self.url = url
        self.proxy = {
            "http": f"http://{self.ip}"  # 代理IP
        }
        self.cookies = None

    def crawl(self) -> "傳送請求":
        """傳送請求"""
        import requests
        try:  # 反止有人不知道設定代理ip
            if self.cookies:
                resp = requests.get(url=self.url, headers=self.headers, proxies=self.proxy, cookies=self.cookies)
            else:
                resp = requests.get(url=self.url, headers=self.headers, proxies=self.proxy)
        except Exception as e:
            print(e)
            if self.cookies:
                resp = requests.get(url=self.url, headers=self.headers, cookies=self.cookies)
            else:
                resp = requests.get(url=self.url, headers=self.headers)
        if resp.status_code == 200:
            resp.encoding = "utf-8"
            return resp
        else:
            print("Requests Error")

    # def spider(self) -> "業務處理":
    #     """業務處理"""
    #     pass
    #
    # def save(self, data=None, name=None, mode="file") -> "持久化儲存":
    #     """持久化儲存"""
    #     pass
    #     if mode == "mongo":  # 當選擇mongo時,儲存在mongo資料庫中
    #         if isinstance(data, dict):
    #             self.clo.insert_one(data)
    #         else:
    #             print("Store Error")
    #     elif mode == "file":  # 儲存在檔案中
    #         with open(f"{self.path}", "a+", encoding="utf-8") as file:
    #             file.write(data)
    #     elif mode == "img" and name:  # 儲存為圖片
    #         with open(f"./{name}", "wb") as f:
    #             f.write(data)
    #     else:
    #         raise 'FileTypeError.The way can only file or mongo'
    #
    # def parse(self) -> "資料分析":
    #     """資料分析"""
    #     pass
    #
    # def run(self) -> "執行程式":
    #     """執行程式"""
    #     from loguru import logger
    #     logger.info("開始執行爬蟲程式")
    #     self.spider()
  • 注意那個寫的python模組也可以在以後的爬蟲中使用,你們也可以根據自己的喜好更改

編寫程式碼

第二步,寫獲取每張圖片的src的程式碼

from spiderModule import Spider  # 匯入我們剛才建立的模組
from lxml import etree  # 對網頁原始碼進行xpath解析
from threading import BoundedSemaphore  # 加入執行緒鎖,限制執行緒的數量
import re, os  # 利用正則匹配,合併網頁,os模組建立要使用的檔案
from loguru import logger  # 進行日誌記錄


class SpiderEveryUrl(Spider):  # 繼承Spider,重寫該方法

    def __init__(self, url):
        """初始化變數"""
        Spider.__init__(self, url, target=self.run)
        self.urlAdd = "https://www.umei.cc/meinvtupian/"  # 進行url的拼接

    def spider(self):
        """處理業務"""
        html = etree.HTML(super().crawl().text)  
        pageUrlList = html.xpath("/html/body/div[2]/div[8]/ul/li/a/@href")
        for i in pageUrlList:
            pageUrl = f"{self.urlAdd}{i.split('/meinvtupian/')[1]}"
            urListAll.append(pageUrl)

    def run(self):
        """啟動程式"""
        self.spider()


class SpiderPicUrl(Spider):

    def __init__(self, url):
        Spider.__init__(self, url, target=self.run)
        self.add = "https://www.umei.cc"

    def spider(self):
        """處理業務"""
        html = etree.HTML(Spider.crawl(self).text)
        nuUrl = html.xpath("/html/body/div[2]/div[12]/a/@href")  
        try:
            if nuUrl:
                nuUrl = nuUrl[-1]
                maxIndex, headersNum, headersAlph = re.search(obj1, nuUrl).group().split("_")[1], re.search(obj2, nuUrl).group(), re.search(obj3, nuUrl).group()
                for i in range(1, int(maxIndex) + 1):
                    if i == 1:
                        eveUrl = f"{self.add}{headersAlph}{headersNum.split('_')[0]}.htm"
                    else:
                        eveUrl = f"{self.add}{headersAlph}{headersNum}{str(i)}.htm"
                    preUrl.append(eveUrl)
            else:
                unRun.append(self.url)
        except Exception as e:
            print(e)

    def run(self):
        """執行程式"""
        with pool_sema:
            self.spider()


class SpiderPicSrc(Spider):

    def __init__(self, url):
        """初始化變數"""
        Spider.__init__(self, url, target=self.run)  # 繼承Spider模組

    def spider(self):
        """處理業務"""
        html = etree.HTML(super(SpiderPicSrc, self).crawl().text)  # 呼叫模組中封裝的方法
        src = html.xpath("//*[@id='ArticleId{dede:field.reid/}']/p/a/img/@src")  # 得到圖片的src
        file = open("../content/PicSrc.txt", "a+")  # 將src寫入檔案
        file.write(f"{src[0]}\n")
        file.close()
        # try:  # 如果安裝了redis可以進行redis去重
        #     if src:
        #         if self.redis.sadd("src", src[0]):  # 使用redis去重
        #             print(f"正在儲存圖片src:{src[0]}")
        #             self.file.write(f"{src[0]}\n")
        #         else:
        #             logger.info(f'{src[0]}已儲存')
        # except Exception as e:
        #     with open("./log.log", 'a+') as file:
        #         file.write(f"{e}\n{src}")
        #     print(e)

    def run(self):
        """執行程式"""
        with pool_sema:
            self.spider()


"""執行緒的使用方法——例項"""
# def Many_Thread(target, *args) -> "示範""其為使用的方式":
#     th = []
#     for i in range(25):  # 開25個執行緒
#         t = threading.Thread(target=target, args=args)
#         th.append(t)
#         t.setDaemon(True)  # 新增守護執行緒,即防止程序進度與執行緒進度不一樣
#     for i in th:  # 迴圈啟動25個執行緒
#         i.start()
#     for i in th:
#         i.join()  # 阻塞執行緒


if __name__ == '__main__':
    while True:
        start, end = input("請輸入要下載該網站的哪部分圖片如(1 3)表示下載1到3頁的圖片,最多有540頁:").split()
        try:
            if isinstance(eval(start), int) and isinstance(eval(end), int) and int(start) <= int(end):
                break
            else:
                continue
        except Exception as e:
            print(e)
            print("請按要求輸入!!!")
    max_connections = 100  # 定義最大執行緒數
    pool_sema = BoundedSemaphore(max_connections) # 或使用Semaphore方法,在主函式中使用with pool_sema可以限制執行緒的數量
    urListAll, threads, preUrl, unRun = [], [], [], []  # 用於儲存url和src
    obj1, obj2, obj3 = re.compile(r"_\d+"), re.compile(r"\d+_"), re.compile(r"\D+")  # 在這裡建立正則表示式,減少快取的佔用
    for i in range(int(start), int(end)+1):
        if i == 1:
            url = "https://www.umei.cc/meinvtupian/"
        else:
            url = f"https://www.umei.cc/meinvtupian/index_{i}.htm"
        logger.info(f"{url}")
        th = SpiderEveryUrl(url)
        threads.append(th)
    for i in threads:
        i.start()
    for i in threads:
        i.join()
    with open("../content/EveryUrl.txt", "w") as f:  # 將提取到的url儲存到EveryUrl中,防止因為意外原因,使得資料丟失
        f.write(str(urListAll))
    print(f"urListAll提取完成")
    threads.clear()

    f = open("../content/EveryUrl.txt", "r")  # 提取檔案中的url
    urList = eval(f.read())
    f.close()
    for url in urListAll: 
        logger.info(url)
        th = SpiderPicUrl(url)
        threads.append(th)
    for i in threads:
        i.start()
    for i in threads:
        i.join()
    with open("../content/PicUrl.txt", "w") as f:  # 將提取到的url儲存到EveryUrl中,防止因為意外原因,使得資料丟失
        f.write(str(preUrl))
    print(f"preUrl提取完成\n錯誤的有:{unRun}" if not unRun else "preUrl提取完成")  # 三目運算
    threads.clear()

    f = open("../content/PicUrl.txt", "r")  # 提取檔案中的url
    urList = eval(f.read())
    f.close()
    for url in preUrl:
        logger.info(f"{url}_src")
        th = SpiderPicSrc(url)
        threads.append(th)
    for i in threads:
        i.start()
    for i in threads:
        i.join()

    print("all over")

下載圖片

第三步,將獲得的src進行訪問,全部下載

from spiderModule import Spider
from loguru import logger
import os, sys
from threading import BoundedSemaphore


class SpiderDown(Spider):

    def __init__(self, url):
        super().__init__(url, target=self.run)

    def spider(self):
        """處理業務"""
        data = Spider.crawl(self).content
        name = self.url.split("/")[-1]  # 給儲存的圖片命名
        logger.info(f"正在下載{name}")
        with open(f"../img/{name}", "wb") as f:
            f.write(data)
        # if self.redis.sadd("imgName", name):  # redis去重
        #     logger.info(f"正在下載{name}")
        #     Spider.save(self, data=data, name=name, mode="img")
        # else:
        #     logger.info(f"{name}已經下載")

    def run(self):
        """執行程式"""
        with pool_sema:  # 使用這個方法,限制執行緒數
            self.spider()


if __name__ == '__main__':
    max_connections = 100  # 定義最大執行緒數
    pool_sema = BoundedSemaphore(max_connections) # 或使用Semaphore方法,在主函式中使用with pool_sema可以限制執行緒的數量
    if not os.path.exists("../img"):
        os.mkdir("../img")
    threads = []
    with open("../content/PicSrc.txt", "r") as file:
        urls = file.readlines()
    for url in urls:
        th = SpiderDown(url.strip())
        threads.append(th)
        th.setDaemon(False)
    for i in threads:
        i.start()
    
    os.remove("../content/PicSrc.txt")  # 移除儲存src的檔案

檢視圖片

第四步,開啟img資料夾,圖片儲存在img資料夾裡面

本文來自部落格園,作者:A-L-Kun,轉載請註明原文連結:https://www.cnblogs.com/liuzhongkun/p/15768416.html