python3 多線程獲取數據實例
import requests
import json
from retrying import retry
from lxml import etree
from queue import Queue
import threading
class QiuShi:
def __init__(self):
# 定義三個隊列
self.url_queue = Queue()
self.html_queue = Queue()
self.content_list_queue = Queue()
self.headers = {
}
def get_url_list(self):
url_list = [‘https://www.qiushibaike.com/8hr/page/{}/‘.format(i) for i in range(1, 14)]
for url in url_list:
self.url_queue.put(url)
@retry(stop_max_attempt_number=3)
def _parse_url(self, url):
response = requests.get(url, headers=self.headers, timeout=3)
assert response.status_code == 200
return etree.HTML(response.content)
def parse_url(self):
# 這裏需要一個url。
# 因為url = self.url_queue.get()只會從隊列裏取一次url.所以這裏需要加while True循環來取。
# 當url隊列裏沒有url的時候這裏會堵塞等待,只要有就取。
# 但是取過後隊列的基數並沒有減1(並沒有減去剛取走的url),所以要在下面使用task_done()
while True:
url = self.url_queue.get()
print(url)
try:
html = self._parse_url(url)
except:
html = None
# 將html添加到隊列裏
self.html_queue.put(html)
self.url_queue.task_done()
def get_content_list(self):
# 和上面一樣
while True:
html = self.html_queue.get()
if html is not None:
div_list = html.xpath(‘//div[@id="content-left"]/div‘)
content_list = []
for div in div_list:
item = {}
item[‘name‘] = div.xpath(‘.//h2/text()‘)[0].replace("\n", "") if len(div.xpath(‘.//h2/text()‘)) > 0 else None
item[‘content‘] = div.xpath(‘.//div[@class="content"]/span/text()‘)[0].replace("\n", "") if len(div.xpath(‘.//div[@class="content"]/span/text()‘)) > 0 else None
item[‘comment‘] = div.xpath(‘.//i[@class="number"]/text()‘)[1] if len(div.xpath(‘.//i[@class="number"]/text()‘)) > 0 else None
item[‘img‘] = div.xpath(‘.//img/@src‘) if len(div.xpath(‘.//img/@src‘)) > 0 else None
content_list.append(item)
self.content_list_queue.put(content_list)
self.html_queue.task_done()
def save_content_list(self):
while True:
content_list = self.content_list_queue.get()
with open("qiubai.json", "a", encoding="utf-8") as f:
for content in content_list:
json.dump(content, f, ensure_ascii=False, indent=2)
f.write(‘,\n‘)
self.content_list_queue.task_done()
def run(self):
thread_list = []
# 創建一個提取url的線程
t_url = threading.Thread(target=self.get_url_list)
thread_list.append(t_url)
# 因為發送請求比較耗時,這裏我們就用多線程來做
for i in range(5):
t_parse = threading.Thread(target=self.parse_url)
thread_list.append(t_parse)
# 提取數據也比較耗時,這裏我們也使用多線程
for i in range(3):
t_get_content_list = threading.Thread(target=self.get_content_list)
thread_list.append(t_get_content_list)
# 保存數據必須用一個線程要數據就會亂
t_save = threading.Thread(target=self.save_content_list)
thread_list.append(t_save)
for t in thread_list:
t.setDaemon(True) # 守護線程
t.start()
# 當所有隊列裏沒有數據,基數都等於0的時候主線程結束。否則一直堵塞在q.join()
for q in [self.content_list_queue, self.html_queue, self.url_queue]:
q.join()
if __name__ == ‘__main__‘:
qiubai = QiuShi()
qiubai.run()
python3 多線程獲取數據實例