1. 程式人生 > >爬蟲與多執行緒

爬蟲與多執行緒

多執行緒和多程序爬蟲

 

一.執行緒

1.什麼是執行緒。

執行緒是作業系統能夠進行運算排程的最小單位。它被包含在程序中,是進城中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個執行緒可以併發多個執行緒,每條執行緒執行不同的任務。

 

2.執行緒常用的方法

方法

說明

start()

執行緒準備就緒,等待CPU排程

setName()

為執行緒設定名稱

getName()

獲取執行緒名稱

setDaemon()

設定為守護執行緒

Join()

逐個執行每個執行緒,執行完畢後繼續往下執行

run()

執行緒被排程後會執行該方法,如果想自定義執行緒類,需要重寫run()方法

 

3.Threading類

3.1 執行緒的普通建立方式

Threadding用於提供執行緒相關的操作,執行緒是應用程式中工作的最小單元。

import threading
import time
def show(arg):
    time.sleep(
1) print('thread' + str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print('主執行緒結束') 結果: 主執行緒結束 thread0 thread1 thread2 thread5 thread4 thread3 thread7 thread6 thread8 thread9

 上述程式碼建立了10個“前臺”執行緒,然後控制器就交給了CPU,CPU根據指定演算法進行排程,分片執行指令。

 

3.2 自定義執行緒類

繼承自threading.Thread類來自定義執行緒類,但是其本質卻是重構thread類中的run()方法。

 

import threading
class
myThread(threading.Thread): def __init__(self, sum): threading.Thread.__init__(self) self.sum = sum
def run(self): print('物件數是:',self.sum) if __name__ == '__main__':
t1
= myThread(1) t2 = myThread(2) t1.start() t2.start()

 

3.3 計運算元執行緒執行的時間 

PS:sleep的時候是不會佔用CPU的,作業系統會把執行緒掛起。

import threading
import time

def show(n):
    time.sleep(1)
    print('thread' + str(n))

start_time = time.time()
obj_list = []
for i in range(5): t = threading.Thread(target=show,args=(i,)) t.start() obj_list.append(t) for obj in obj_list: obj.join() print('花費的時間為:',time.time() - start_time)

 

3.4 守護執行緒 

執行緒的setDaemon(True)將執行緒變成主執行緒的守護執行緒,意思是當主程序結束後,子執行緒也會隨之退出。意味著當主執行緒結束後,程式就結束了。

 1 import threading
 2 import time
 3 
 4 def show(n):
 5     time.sleep(1)
 6     print('thread' + str(n))
 7 
 8 start_time = time.time()
 9 obj_list = []
10 
11 for i in range(5):
12     t = threading.Thread(target=show,args=(i,))
13     t.setDaemon(True)
14     t.start()
15     obj_list.append(t)
16 
17 print('花費的時間為:',time.time() - start_time)

 

3.5 GIL(全域性直譯器鎖) 

在Python的執行環境中,無論電腦是單核還是雙核,作業系統同時只會執行一個執行緒。究其原因,是因為GIL(全域性直譯器鎖)。

在Python中,一個執行緒要想要執行,必須要先拿到GIL。可以吧GIL想象成一個“通行證”,並且在一個程序中,GIL只有一個。沒有通行證的執行緒就不會被執行。

Python多執行緒的工作過程:

  •  拿到公共資料
  • 申請GIL
  • Python直譯器呼叫os的原生執行緒
  • os操作CPU執行運算
  • 當該執行緒的執行時間到了之後,無論是否執行完,GIL被釋放
  • 其他執行緒重複上面的操作
  • 其他程序執行完成後,切換到原來的執行緒(從記錄的上下文繼續執行)

 

3.6 執行緒鎖(Lock,RLock)

由於執行緒之間是進行隨機排程,並且每個執行緒可能只執行n條執行之後,當多個執行緒同時修改同一條資料時可能會出現髒資料,所以,出現了執行緒鎖 - 同一時刻允許一個執行緒執行操作。

import threading,time

def run(n):
    global num
    num += 1

num = 0
obj_list = []

for i in range(20000):
    t = threading.Thread(target=run,args=(i,))
    t.start()
    obj_list.append(t)

for obj in obj_list:
    obj.join()

print('num:',num)
髒資料:19999

 


3.6.1  互斥鎖(Lock) 

為了防止上面情況的發生,我們可以使用互斥鎖(Lock)來解決。

import threading,time

lock = threading.Lock() # 例項化一個鎖物件

def run(n):
    lock.acquire()  # 獲取鎖
    global num
    num += 1
    lock.release()   # 釋放鎖

num = 0
obj_list = []

for i in range(20000):

    t = threading.Thread(target=run,args=(i,))
    t.start()
    obj_list.append(t)

for obj in obj_list:
    obj.join()
print('num:',num)

 

3.6.2  遞迴鎖(RLock) 

RLock的用法和Lock一樣,只是他支援巢狀。在多個鎖沒有釋放的時候一般會使用Rlock類。

import threading,time

lock = threading.RLock() # 例項化一個鎖物件
num = 0
obj_list = []
def run(n):
    lock.acquire() # 獲取鎖 
    global num 
    num += 1 
    lock.release() # 釋放鎖 

for i in range(20000): 
    t = threading.Thread(target=run,args=(i,)) 
    t.start() 
    obj_list.append(t) 

for obj in obj_list: 
    obj.join() 
print('num:',num)

 

3.7 訊號量Semaphore 

互斥鎖 同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人只能等裡面有人出來了才能再進去。

  import threading,time

  lock = threading.BoundedSemaphore(6) # 例項化一個鎖物件

  def run(n):

    lock.acquire()  # 獲取鎖

    time.sleep(1)

    print('run the thread: %s' % n)

    lock.release()   # 釋放鎖

  num = 0

  for i in range(200):

    t = threading.Thread(target=run,args=(i,))

    t.start()

 

3.8事件(Event)

python執行緒的事件用於主執行緒控制其他執行緒的執行,事件是一個簡單的執行緒同步物件,主要提供了以下幾種方法:

方法

說明

clear()

將flag設定為“false”

set()

將flag設定為“true”

is_set()

判斷是否設定了flag

wait()

一直監聽flag,沒有檢測到會一直處於阻塞狀態

事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞

 

import threading,time

event = threading.Event()  # 建立事件物件

def lighter():
    count = 0
    event.set()   #初始值為綠燈
    while 1:
        if 5 < count <= 10:
            event.clear() #紅燈,清楚標誌位
            print('\33[41;1mred light is on...\033[0m')
        elif count > 10:
            event.set()   # 綠燈,設定標誌位
            count = 0
        else:
            print('\33[41;1mred light is on...\033[0m')
        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():  # 判斷是否設定了標誌位
            print("[%s] 綠燈亮,請行駛..." % name)
            time.sleep(1)
        else:
            print("[%s] 紅燈亮,請等待..." % name)
            event.wait()
            print("[%s] 綠燈亮,開始行駛..." % name)

light = threading.Thread(target=lighter,)

car = threading.Thread(target=car, args=('test',))
light.start()  
car.start()

 

 


3.9條件(Condition)
 

使得執行緒等待,只有滿足某條件時,才釋放n個執行緒。

互斥鎖是最簡單的執行緒同步機制,Python提供的Condition物件提供了對複雜執行緒同步問題的支援。Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

可以認為Condition物件維護了一個鎖(Lock/RLock)和一個waiting池。執行緒通過acquire獲得Condition物件,當呼叫wait方法時,執行緒會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個執行緒。當呼叫notify方法時,Condition物件會從waiting池中挑選一個執行緒,通知其呼叫acquire方法嘗試取到鎖。

Condition物件的建構函式可以接受一個Lock/RLock物件作為引數,如果沒有指定,則Condition物件會在內部自行建立一個RLock。

除了notify方法外,Condition物件還提供了notifyAll方法,可以通知waiting池中的所有執行緒嘗試acquire內部鎖。由於上述機制,處於waiting狀態的執行緒只能通過notify方法喚醒,所以notifyAll的作用在於防止有執行緒永遠處於沉默狀態。

演示條件變數同步的經典問題是生產者與消費者問題:假設有一群生產者(Producer)和一群消費者(Consumer)通過一個市場來互動產品。生產者的”策略“是如果市場上剩餘的產品少於1000個,那麼就生產100個產品放到市場上;而消費者的”策略“是如果市場上剩餘產品的數量多餘100個,那麼就消費3個產品。用Condition解決生產者與消費者問題的程式碼如下:

import threading
import time

class Producer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count > 1000:
                    con.wait()
                else:
                    count = count+100
                    msg = self.name+' produce 100, count=' + str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)


class Consumer(threading.Thread):
    def run(self):
        global count
        while True:
            if con.acquire():
                if count < 100:
                    con.wait()
                else:
                    count = count-3
                    msg = self.name+' consume 3, count='+str(count)
                    print msg
                    con.notify()
                con.release()
                time.sleep(1)

count = 500
con = threading.Condition()

def test():
    for i in range(2):
        p = Producer()
        p.start()

    for i in range(5):
        c = Consumer()
        c.start()

if __name__ == '__main__':
    test()

 

  

4.多執行緒的爬蟲例項

'''使用多執行緒爬取1500個url中的圖片'''

import
threading,requests,time link_list = [] with open('url.txt','r') as f: for u in f.readlines(): url = u.split()[0].replace('\n','') link_list.append(url) start_time = time.time() class myThread(threading.Thread): def __init__(self,name,link_range): threading.Thread.__init__(self) self.name = name self.link_range = link_range self.userName = 0 def run(self): print('開始:',self.name) self.craw(self.name,self.link_range) print('結束:',self.name) def writeImages(self, ThreadName, url): print("正在儲存檔案 %s ..." % ThreadName+str(self.userName)) path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png' file = open(path, 'wb') images = requests.get(url,timeout = 20).content file.write(images) file.close() self.userName += 1 def craw(self,name,link_range): for i in range(link_range[0],link_range[len(link_range)-1]): self.writeImages(name,link_list[i]) obj_list = [] url_list = [(0,300),(301,600),(601,900),(901,1200),(1201,1500)] # 建立新執行緒 for i in range(5): t = myThread('Thread-'+str(i+1), url_list[i]) t.start() obj_list.append(t) # 等待所有執行緒執行完成 for url in url_list: url.join() end_time = time.time() print('爬蟲的執行時間為:',end_time - start_time)

 

5.佇列(queue)

queue是python中的標準庫,俗稱佇列。在python中,多個執行緒之間的資料是共享的,多個執行緒進行資料交換的時候,不能夠保證資料的安全性和一致性,所以當多個執行緒需要進行資料交換的時候,佇列就出現了,佇列可以完美解決執行緒間的資料交換,保證執行緒間資料的安全性和一致性。

PS: 在python2.x中,模組名為Queue

queue模組有三種佇列及建構函式

l  Python queue模組的FIFO佇列先進先出。 queue.Queue(maxsize)

l  LIFO類似於堆,即先進後出。 queue.LifoQueue(maxsize)

l  還有一種是優先順序佇列級別越低越先出來。 queue.PriorityQueue(maxsize)

queue模組中的常用方法:

方法

說明

queue.qsize()

返回佇列的大小

queue.empty()

如果佇列為空,返回True,反之False

queue.full()

如果佇列滿了,返回True,反之False (queue.full 與 maxsize 大小對應)

queue.get([block[, timeout]])

獲取佇列,立即取出一個元素, timeout超時時間

queue.put(item[, timeout]])

寫入佇列,立即放入一個元素, timeout超時時間

queue.join()

阻塞呼叫執行緒,直到佇列中的所有任務被處理掉, 實際上意味著等到佇列為空,再執行別的操作

queue.task_done()

在完成一項工作之後,queue.task_done()函式向任務已經完成的佇列傳送一個訊號

 

 

5.1程式碼例項

l   建立佇列

import queue
q = queue.Queue()

l   empty方法(如果佇列為空,返回True)

import queue
q = queue.Queue()
print(q.empty())
#輸出:True

l  full方法(如果佇列滿了,返回True)

import queue
q = queue.Queue(1) #指定佇列大小
q.put('a')
print(q.full())
#輸出:True

l  put方法和get方法

import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.get())
#輸出:a
print(q.get())
#輸出:b

l  qsize方法(返回佇列裡元素個數)

import queue
q = queue.Queue()
q.put('a')
q.put('b')
print(q.qsize())
#輸出:2


5.2
 生產者消費者 

import threading,time
import queue

q = queue.Queue(maxsize=10)

# 生產者
def Producer(name):
    count = 1
    while True:
        q.put("冠軍%s" % count)
        print("冠軍",count)
        count +=1
        time.sleep(0.1)

#消費者
def  Consumer(name):
    #while q.qsize()>0:
    while True:
        print("[%s] 取到[%s]..." %(name, q.get()))
        time.sleep(1)

p = threading.Thread(target=Producer,args=("IG",))
c = threading.Thread(target=Consumer,args=("LPL",))
c1 = threading.Thread(target=Consumer,args=("LCK",))
p.start()
c.start()
c1.start()

 

6.多執行緒和爬蟲例項。

import threading,requests,time
import queue

start_time = time.time()
obj_list = []
work_queue = queue.Queue(1500)
link_list = []
with open('url.txt','r') as f:
    for u in f.readlines():
        url = u.split()[0].replace('\n','')
        link_list.append(url)

# 填充佇列
for url in link_list:
    work_queue.put(url)

class myThread(threading.Thread):
    def __init__(self,name,q):
        threading.Thread.__init__(self)
        self.name = name
        self.userName = 0
        self.q = q

    def run(self):
        print('開始:',self.name)
        while True:
            try:
                self.craw(self.name,self.q)
            except Exception as e:
                break
        print('結束:',self.name)

    def writeImages(self, ThreadName, url):
        print("正在儲存檔案 %s ..." % ThreadName+str(self.userName))
        path = r'D:\zhappian' + '\\' + ThreadName+str(self.userName) + '.png'
        file = open(path, 'wb')
        images = requests.get(url,timeout = 20).content
        file.write(images)
        file.close()
        self.userName += 1

    def craw(self,name,q):
        url = q.get(timeout = 2)
        try:
            self.writeImages(name, url)
        except Exception as e:
            print(q.qsize(),url,e)

# 建立新執行緒
for i in range(5):
    t = myThread('Thread-'+str(i+1), work_queue)
    t.start()
    obj_list.append(t)

# 等待所有執行緒執行完成
for url in obj_list:
    url.join()

end_time = time.time()
print('爬蟲的執行時間為:',end_time - start_time)