1. 程式人生 > 實用技巧 >從Request到Selenium的簡單爬蟲學習筆記

從Request到Selenium的簡單爬蟲學習筆記

selenium是一個用於網站測試的工具,是一個Web自動化工具,測試人員必會的工具。他可以操作瀏覽器對網頁進行模擬人的操作,比如點選,獲取文字資料,跳轉等等。所以也可以被用來爬蟲。

簡單的網站爬蟲用request就可以實現,但由於反爬蟲技術的出現,對於一些網站使用request就需要更多的技巧去爬資料,而且現在大多數的網站採用js渲染網頁的技術,直接用request獲取可能得到的並不是瀏覽器渲染的網頁,而是一堆帶有js程式碼的html檔案。所以使用selenium操縱瀏覽器去訪問這樣的網站就比單單用request簡單許多,瀏覽器幫助我們解決了很大部分問題,但是隨之而來的就是效率會很慢。瀏覽器開啟網頁會把所有資源都載入進來,包括圖片,css,js。不如request直接載入html就能獲取到想要的資料那麼快。如果技術夠強,並且想要的資料比較大, 還是使用request效率更高。

下載圖片
  • request

    我們下載煎蛋網的meizi圖片,網址url=http://jandan.net/ooxx。

    首先使用request.get(url)訪問網頁,然後使用Beautifusoup解析網頁,獲取網頁你想要的圖片的連結地址(需要通過chrome開發工具F12來了解網頁結構,然後編寫程式碼提取網頁的元素。)。

    然後就是下載圖片,為了在下載過程中使用進度條展示下載進度,可以使用流的方式下載。

    最後就是翻頁,網站不可能一頁就展示所有內容,大多數時刻是需要分頁的,我們找完一頁的資料之後,可以看看是否有下一頁,如果有我們就跳轉到下一頁的連結,否則就結束爬蟲。

    import requests
    import os
    from contextlib import closing
    from bs4 import BeautifulSoup
    
    
    def img_download(folder, img_href, headers):
        if not os.path.exists(folder):
            os.mkdir(folder)
        for src in img_href:
            # 下載時顯示進度條
            with closing(requests.get("http:" + src, headers=headers, stream=True)) as r:
                chunk_size = 1024
                content_size = int(r.headers['content-length'])
                file_name = src.split('/')[-1]
                progress = ProgressBar(file_name, total=content_size, unit="KB", chunk_size=chunk_size, run_status="正在下載", fin_status="下載完成")
                print(r.status_code, src)
                with open(folder + file_name, 'wb') as f:
                    for data in r.iter_content(chunk_size=chunk_size):
                        f.write(data)
                        progress.refresh(count=len(data))
    
    
    class ProgressBar(object):
        def __init__(self, title, count=0.0, run_status=None, fin_status=None, total=100.0, unit='', sep='/', chunk_size=1.0):
            super(ProgressBar, self).__init__()
            self.info = "【%s】%s %.2f %s %s %.2f %s "
            self.title = title
            self.total = total
            self.count = count
            self.chunk_size = chunk_size
            self.status = run_status or ""
            self.fin_status = fin_status or " " * len(self.status)
            self.unit = unit
            self.seq = sep
    
        def __get_info(self):
            # 【名稱】狀態 進度 單位 分割線 總數 單位
            _info = self.info % (self.title, self.status, self.count/self.chunk_size, self.unit, self.seq, self.total/self.chunk_size, self.unit)
            return _info
    
        def refresh(self, count=1, status=None):
            self.count += count
            self.status = status or self.status
            end_str = "\r"
            percent = self.count / self.total
            bar = '*' * int(10 * percent) + '-' * (10 - int(10 * percent))
            if self.count >= self.total:
                end_str = '\n'
                self.status = status or self.fin_status
                bar = '*' * 10
            print(self.__get_info() + bar, end=end_str)
    
            
    def main():
        headers = {
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36'
        }
        url = 'http://jandan.net/ooxx'
        while url is not None:
            # 請求
            r = requests.get(url, headers=headers)
            html = r.text
            # 解析
            soup = BeautifulSoup(html, 'lxml')
            img_div = soup.find_all(class_='text')
            img_href = []
            for d in img_div:
                img = d.find('a', class_='view_img_link')
                img_href.append(img['href'])
            cur = soup.find('span', class_='current-comment-page')
            print('page', cur.text, r.status_code, url)
            # print(img_href)
            # 下載
            folder = "./img/meizi/"
            img_download(folder, img_href, headers)
            # 下一頁
            next_page = soup.find('a', class_='previous-comment-page')
            if next_page is None:
                url = None
            else:
                url = 'http:' + next_page['href']
    
    
    if __name__ == '__main__':
        main()
    
    
  • selenium

    首先需要安裝selenium庫,然後從http://chromedriver.storage.googleapis.com/index.html 下載chromedriver,解壓到符合你檔案管理的路徑下,然後將路徑新增到環境變數中(Windows下), 在cmd或powershell輸入chromedriver如果沒有錯誤就可以了。就可以寫程式碼了。

    我們從host = https://bing.ioliu.cn/下載bing的桌布。通過

    from selenium import webdriver
    profile = webdriver.ChromeOptions()
    driver = webdriver.Chrome(chrome_options=profile)
    driver.get(host)
    

    開啟chrome瀏覽器並訪問網站。其中

    profile = webdriver.ChromeOptions()
    profile.add_experimental_option("prefs", {"download.default_directory": "D:\\Code\\python\\Spider\\img\\bingwallpaper"})
    

    是設定開啟的瀏覽器的預設儲存位置。

    然後我們觀察到這個網頁上的圖片上有下載按鈕,所以我們直接操縱瀏覽器點選下載按鈕即可,但是為了防止下載過於頻繁,我們每次點選下載後會暫停幾秒。driver通過find_elements_by_class_name函式按照class搜尋元素,元素的get_attribute函式可以獲取屬性資訊。

    最後點選下一頁。點選下載按鈕和點選下一頁都需要用到selenium的ActionChains,比如點選下一頁

    from selenium.webdriver.common.action_chains import ActionChains
    next = driver.find_element_by_link_text("下一頁")
    ActionChains(driver).click(next).perform()
    

    完整程式碼

    import time
    import random
    from selenium import webdriver
    from selenium.webdriver.common.action_chains import ActionChains
    from selenium.common.exceptions import NoSuchElementException
    
    
    def selenium_main():
        host = "https://bing.ioliu.cn/"
        profile = webdriver.ChromeOptions()
        profile.add_experimental_option("prefs", {"download.default_directory": "D:\\Code\\python\\Spider\\img\\bingwallpaper"})
        driver = webdriver.Chrome(chrome_options=profile)
        driver.implicitly_wait(10)
        driver.get(host)
        next = [1]
        try:
            while next is not [] or next is not None:
                img_dls = driver.find_elements_by_class_name('download')
                srcs = []
                alts = []
                for img in img_dls:
                    src = img.get_attribute("href")
                    print("src: " + src)
                    srcs.append(src)  # url
                    ActionChains(driver).click(img).perform()
                    time.sleep(random.randint(3, 5))
                next = driver.find_element_by_link_text("下一頁")
                ActionChains(driver).click(next).perform()
                time.sleep(3)
            input()
    
        except NoSuchElementException as e:
            print(e)
        finally:
            driver.close()
    
    
    if __name__ == '__main__':
       # main()  # 會被封IP
        selenium_main()
    
知乎使用者關係(爬取失敗)

爬取知乎使用者關係和上面邏輯差不多,但是知乎這樣的網站肯定有很多辦法應對爬蟲。首先對知乎使用者介面一通分析觀察,我們開始寫程式碼,獲取使用者的一些資訊,然後遍歷使用者的關注和被關注列表,然後將資料以一個自定義的User資料結構儲存。

但是知乎會識別你的瀏覽器是否是被自動測試軟體控制的,所以這裡還需要一些別的方式,參照這裡,我們需要開啟系統的瀏覽器然後用selenium監控我們開啟的瀏覽器。

options = webdriver.ChromeOptions()
# 需要將系統的chrome.exe加入環境變數,並且執行
# chrome.exe --remote-debugging-port=9222 --user-data-dir="D:\Code\python\Spider\selenium-portal"
# 此時開啟系統的chrome,之後用selenium接管瀏覽器,不被網站識別。
options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")  # 接管本機的chrome
self.driver = webdriver.Chrome(chrome_options=options)

如果沒有什麼意外,我們用DFS或BFS都可以搜尋使用者關係資訊,然後構造一個網路進行分析,但意外是會發生的,當你頻繁點選知乎時,知乎會識別出來並給你一個驗證碼讓你填。解決方法可以使用機器學習的方法自動識別驗證碼(還未實現)。目前的程式碼如下

import time
import json
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import NoSuchElementException


class User:
    iid = 0

    def __init__(self, url_id, _type, name='', agreed_num=0, following_ids=[], follower_ids=[]):
        self.id = User.iid
        User.iid += 1
        self.url_id = url_id  # id
        self.name = name
        self.type = _type  # 型別
        self.agreed_num = agreed_num
        self.following_ids = following_ids  # 關注的user的id
        self.follower_ids = follower_ids  # 被關注的user的id

    def __str__(self):
        return 'id:' + str(self.id) + '\t' + str(self.url_id) + '\t' + str(self.type) + '\t' + str(self.agreed_num)


class ZhihuUsership:

    def __init__(self):
        self.url_ids = set()
        options = webdriver.ChromeOptions()
        # 需要將系統的chrome.exe加入環境變數,並且執行
        # chrome.exe --remote-debugging-port=9222 --user-data-dir="D:\Code\python\Spider\selenium-portal"
        # 此時開啟系統的chrome,之後用selenium接管瀏覽器,不被網站識別。
        options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")  # 接管本機的chrome
        self.driver = webdriver.Chrome(chrome_options=options)

    def save(self, root_user, file_name):
        root_user_json = {"id": str(root_user.id), "url_id": str(root_user.url_id), "name": root_user.name,
                          "type": root_user.type, "agree_num": str(root_user.agreed_num),
                          "following_ids": root_user.following_ids,
                          "follower_ids": root_user.follower_ids}
        with open(file_name, "r") as fr:
            before_json = json.load(fr)
            before_json.append(root_user_json)
        with open(file_name, "w") as fw:
            json.dump(before_json, fw)

    def login(self):
        print("Login...(not implemented)")
        pass

    def get_follow(self, root_user, following_url, follower_url):
        # 遍歷following
        followings = []  # 關注的User列表
        following_as = self.driver.find_elements_by_class_name('UserLink-link')  # 關注的人主頁連結元素列表
        following_as = [following_as[i] for i in range(len(following_as)) if i % 2 == 0]  # 同一個有頭像和名字兩個連結,取一個
        # 遍歷關注列表裡的分頁
        while True:
            # 處理連結列表然後生成user列表
            for following_a in following_as:
                href = following_a.get_attribute('href')
                _type = href.split('/')[3]
                _url_id = href.split('/')[4]
                followings.append(User(url_id=_url_id, _type=_type))
            # 點選下一頁
            next_button = self.driver.find_elements_by_class_name('PaginationButton-next')
            if next_button == []:
                break
            next_button = next_button[0]
            ActionChains(self.driver).click(next_button).perform()
            time.sleep(3)
            following_as = self.driver.find_elements_by_class_name('UserLink-link')  # 關注的人主頁連結元素列表
            following_as = [following_as[i] for i in range(len(following_as)) if i % 2 == 0]  # 同一個有頭像和名字兩個連結,取一個
        print(root_user, " following number: ", len(followings))

        # 遍歷followers
        self.driver.get(follower_url)
        followers = []  # 被關注的User列表
        follower_as = self.driver.find_elements_by_class_name('UserLink-link')  # 被關注的人主頁連結元素列表
        follower_as = [follower_as[i] for i in range(len(follower_as)) if i % 2 == 0]
        # 遍歷關注列表裡的分頁
        while True:
            # 處理連結列表然後生成user列表
            for follower_a in follower_as:
                href = follower_a.get_attribute('href')
                _type = href.split('/')[3]
                _url_id = href.split('/')[4]
                followers.append(User(url_id=_url_id, _type=_type))
            # 點選下一頁
            next_button = self.driver.find_elements_by_class_name('PaginationButton-next')
            if next_button == []:
                break
            next_button = next_button[0]
            ActionChains(self.driver).click(next_button).perform()
            time.sleep(3)
            follower_as = self.driver.find_elements_by_class_name('UserLink-link')  # 被關注的人主頁連結元素列表
            follower_as = [follower_as[i] for i in range(len(follower_as)) if i % 2 == 0]
        print(root_user, " follower number: ", len(followers))

        # 獲取following follower的ids, 並新增到root_user的變數中
        followings_id = [u.id for u in followings]
        followers_id = [u.id for u in followers]
        root_user.following_ids = followings_id
        root_user.follower_ids = followers_id
        return followings, followers

    def search_selenium(self, root_user):
        # TODO 模擬登入
        self.login()
        # 訪問當前使用者頁面
        type_ = root_user.type
        url_id = root_user.url_id
        following_url = 'https://www.zhihu.com/' + type_ + '/' + url_id + '/following'  # 關注的
        follower_url = 'https://www.zhihu.com/' + type_ + '/' + url_id + '/followers'  # 被關注的
        self.url_ids.add(url_id)
        self.driver.get(following_url)
        try:
            # 獲取當前使用者資訊
            agreed_divs = self.driver.find_elements_by_class_name('css-vurnku')
            agreed_num = 0
            for agreed_div in agreed_divs:
                if agreed_div.text[:2] == "獲得":
                    agreed_num = int(agreed_div.text.split(' ')[1].replace(',', ''))
                    break
            name_span = self.driver.find_element_by_class_name('ProfileHeader-name')
            name = name_span.text
            root_user.agreed_num = agreed_num  # 獲取點贊數
            root_user.name = name  # 獲取名字
            print(root_user)
            # 獲取關注列表和被關注列表的資訊並將id列表加入root_user
            followings, followers = self.get_follow(root_user, following_url, follower_url)
            # 儲存當前User到json,並儲存到檔案
            self.save(root_user, file_name="./data/zhihu-user.json")
            # 搜尋關注和被關注列表的使用者(深度優先搜尋)
            for u in followings + followers:
                if u.url_id not in self.url_ids:
                    self.search_selenium(u)
        except NoSuchElementException as e:
            print(e)
        finally:
            self.driver.close()


def main():
    z = ZhihuUsership()
    z.search_selenium(User(url_id='jiaze-li', _type='people'))


if __name__ == '__main__':
    main()