Queue是python中的標準庫,可以直接import Queue引用;佇列時執行緒間最常用的互動資料的形式。

對於資源,加鎖是個重要的環節。因為python原生的list,dict等,都是not thread safe的。而Queue,是執行緒安全的,因此在滿足使用條件下,建議使用佇列

  1. 初始化:class Queue.Queue(maxsize)FIFO先進先出
  2. 包中的常用方法:
    • Queue.qszie()返回佇列的大小
    • Queue.empty()如果佇列為空,返回True,否則返回False
    • Queue.full()如果佇列滿了,返回True,反之False
    • Queue.full 與 maxsize大小對應
    • Queue.get([block[, timeout]])獲取佇列,timeout等待事件
  3. 建立一個"佇列"物件
    • import Queue
    • myqueue = Queue.Queue(maxsize=10)
  4. 將一個值放入佇列中
    • myqueue.put(10)
  5. 將一個值從佇列中取出
    • myqueue.get()



#-*- coding:utf-8 -*-

import requests
from lxml import etree
from Queue import Queue import threading import time import json class Thread_crawl(threading.Thread): """ 抓取執行緒類 """ def __init__(self, threadID, q): threading.Thread.__init__(self) self.threadID = threadID self.q = q def run(self): print("String: "+self.threadID) self.qiushi_spider() print("Exiting: "+self.threadID) def qiushi_spider(self): while True: if self.q.empty(): break else: page = self.q.get() print('qiushi_spider=', self.threadID, 'page=', str(page)) url = 'http://www.qiushibaike.com/8hr/page/' + str(page)+"/" headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36', 'Accept-Language':'zh-CN,zh;q=0.8' } #多次嘗試失敗結束,防止死迴圈 timeout = 4 while timeout > 0: timeout -= 1 try: content = requests.get(url, headers = headers) data_queue.put(content.text) break except Exception, e: print "qiushi_spider", e if timeout < 0: print 'timeout', url class Thread_Parser(threading.Thread): """ 頁面解析類 """ def __init__(self, threadID, queue, lock, f): threading.Thread.__init__(self) self.threadID = threadID self.queue = queue self.lock = lock self.f = f def run(self): print("starting ", self.threadID) global total, exitFlag_Parser while not exitFlag_Parser: try: """ 呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block, 預設為True 如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用 如果佇列為空且block為False,佇列將引發Empty異常 """ item = self.queue.get(False) if not item: pass self.parse_data(item) self.queue.task_done() print("Thread_Parser=", self.threadID, 'total=', total) except: pass print "Exiting ", self.threadID def parse_data(self, item): """ 解析網頁函式 :param item:網頁內容 :return """ global total try: html = etree.HTML(item) result = html.xpath('//div[contains(@id,"qiushi_tag")]') for site in result: try: imgUrl = site.xpath('.//img/@src')[0] title = site.xpath('.//h2')[0].text content = site.xpath('.//div[@class="content"]/span')[0].text.strip() vote = None comments = None try: # 投票次數 vote = site.xpath('.//i')[0].text # print(vote) #print site.xpath('.//*[@class="number"]')[0].text # 評論資訊 comments = site.xpath('.//i')[1].text except: pass result = { 'imageUrl' : imgUrl, 'title' : title, 'content' : content, 'vote' : vote, 'comments' : comments } with self.lock: self.f.write(json.dumps(result, ensure_ascii=False).encode('utf-8') + '\n') except Exception, e: print("site in result ", e) except Exception, e: print("parse_data", e) with self.lock: total += 1 data_queue = Queue() exitFlag_Parser = False lock = threading.Lock() total = 0 def main(): output = open('qiushibaike.json', 'a') #初始化網頁頁碼page從1-10個頁面 pageQueue = Queue(10) for page in range(1, 11): pageQueue.put(page) #初始化採集執行緒 crawlthreads = [] crawllist = ["crawl-1", "crawl-2", "crawl-3"] for threadID in crawllist: thread = Thread_crawl(threadID, pageQueue) thread.start() crawlthreads.append(thread) # #初始化解析執行緒parseList parserthreads = [] parserList = ["parser-1", "parser-2", "parser-3"] #分別啟動parserList for threadID in parserList: thread = Thread_Parser(threadID, data_queue, lock, output) thread.start() parserthreads.append(thread) # 等待佇列情況 while not pageQueue.empty(): pass #等待所有執行緒完成 for t in crawlthreads: t.join() while not data_queue.empty(): pass #通知執行緒退出 global exitFlag_Parser exitFlag_Parser = True for t in parserthreads: t.join() print 'Exiting Main Thread' with lock: output.close() if __name__ == '__main__': main()