多執行緒爬蟲案例
阿新 • • 發佈:2018-12-19
Queue(佇列物件)
Queue是python中的標準庫,可以直接import Queue引用;佇列是執行緒間最常用的交換資料的形式
python下多執行緒的思考
對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是執行緒安全的,因此在滿足使用條件下,建議使用佇列
-
初始化: class Queue.Queue(maxsize) FIFO 先進先出
-
包中的常用方法:
-
Queue.qsize() 返回佇列的大小
-
Queue.empty() 如果佇列為空,返回True,反之False
-
Queue.full() 如果佇列滿了,返回True,反之False
-
Queue.full 與 maxsize 大小對應
-
Queue.get([block[, timeout]])獲取佇列,timeout等待時間
-
-
建立一個“佇列”物件
- import Queue
- myqueue = Queue.Queue(maxsize = 10)
-
將一個值放入佇列中
- myqueue.put(10)
-
將一個值從佇列中取出
- myqueue.get()
示例程式碼:
import requests from lxml import etree import json from queue import Queue import threading class Qiubai: def __init__(self): self.headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWeb\ Kit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"} self.url_queue = Queue() #例項化三個佇列,用來存放內容 self.html_queue =Queue() self.content_queue = Queue() def get_total_url(self): ''' 獲取了所有的頁面url,並且返回urllist return :list ''' url_temp = 'https://www.qiushibaike.com/8hr/page/{}/' url_list = [] for i in range(1,36): # url_list.append(url_temp.format(i)) self.url_queue.put(url_temp.format(i)) def parse_url(self): ''' 一個傳送請求,獲取響應,同時etree處理html ''' while self.url_queue.not_empty: url = self.url_queue.get() print("parsing url:",url) response = requests.get(url,headers=self.headers,timeout=10) #傳送請求 html = response.content.decode() #獲取html字串 html = etree.HTML(html) #獲取element 型別的html self.html_queue.put(html) self.url_queue.task_done() def get_content(self): ''' :param url: :return: 一個list,包含一個url對應頁面的所有段子的所有內容的列表 ''' while self.html_queue.not_empty: html = self.html_queue.get() total_div = html.xpath('//div[@class="article block untagged mb15"]') #返回divelememtn的一個列表 items = [] for i in total_div: #遍歷div標槍,獲取糗事百科每條的內容的全部資訊 author_img = i.xpath('./div[@class="author clearfix"]/a[1]/img/@src') author_img = "https:" + author_img[0] if len(author_img) > 0 else None author_name = i.xpath('./div[@class="author clearfix"]/a[2]/h2/text()') author_name = author_name[0] if len(author_name) > 0 else None author_href = i.xpath('./div[@class="author clearfix"]/a[1]/@href') author_href = "https://www.qiushibaike.com" + author_href[0] if len(author_href) > 0 else None author_gender = i.xpath('./div[@class="author clearfix"]//div/@class') author_gender = author_gender[0].split(" ")[-1].replace("Icon", "") if len(author_gender) > 0 else None author_age = i.xpath('./div[@class="author clearfix"]//div/text()') author_age = author_age[0] if len(author_age) > 0 else None content = i.xpath('./a[@class="contentHerf"]/div/span/text()') content_vote = i.xpath('./div[@class="stats"]/span[1]/i/text()') content_vote = content_vote[0] if len(content_vote) > 0 else None content_comment_numbers = i.xpath('./div[@class="stats"]/span[2]/a/i/text()') content_comment_numbers = content_comment_numbers[0] if len(content_comment_numbers) > 0 else None hot_comment_author = i.xpath('./a[@class="indexGodCmt"]/div/span[last()]/text()') hot_comment_author = hot_comment_author[0] if len(hot_comment_author) > 0 else None hot_comment = i.xpath('./a[@class="indexGodCmt"]/div/div/text()') hot_comment = hot_comment[0].replace("\n:", "").replace("\n", "") if len(hot_comment) > 0 else None hot_comment_like_num = i.xpath('./a[@class="indexGodCmt"]/div/div/div/text()') hot_comment_like_num = hot_comment_like_num[-1].replace("\n", "") if len(hot_comment_like_num) > 0 else None item = dict( author_name=author_name, author_img=author_img, author_href=author_href, author_gender=author_gender, author_age=author_age, content=content, content_vote=content_vote, content_comment_numbers=content_comment_numbers, hot_comment=hot_comment, hot_comment_author=hot_comment_author, hot_comment_like_num=hot_comment_like_num ) items.append(item) self.content_queue.put(items) self.html_queue.task_done() #task_done的時候,佇列計數減一 def save_items(self): ''' 儲存items :param items:列表 ''' while self.content_queue.not_empty: items = self.content_queue.get() f = open("qiubai.txt","a") for i in items: json.dump(i,f,ensure_ascii=False,indent=2) # f.write(json.dumps(i)) f.close() self.content_queue.task_done() def run(self): # 1.獲取url list # url_list = self.get_total_url() thread_list = [] thread_url = threading.Thread(target=self.get_total_url) thread_list.append(thread_url) #傳送網路請求 for i in range(10): thread_parse = threading.Thread(target=self.parse_url) thread_list.append(thread_parse) #提取資料 thread_get_content = threading.Thread(target=self.get_content) thread_list.append(thread_get_content) #儲存 thread_save = threading.Thread(target=self.save_items) thread_list.append(thread_save) for t in thread_list: t.setDaemon(True) #為每個程序設定為後臺程序,效果是主程序退出子程序也會退出 t.start() #為了解決程式結束無法退出的問題 # # for t in thread_list: # t.join() self.url_queue.join() #讓主執行緒等待,所有的佇列為空的時候才能退出 self.html_queue.join() self.content_queue.join() if __name__ == "__main__": qiubai = Qiubai() qiubai.run()