[Python]多執行緒程式設計&執行緒間共享變數&消費者生產者問題的解決
由於單程序爬蟲的種種弊端,以及大量獲取資料的需要,我最近開始寫分散式爬蟲。儘管網上已經有比較現成的方案,如scrapy+rq等,但是出於種種原因考慮,比如部署的難易程度,任務比較單一,以及想自己練練手等,還是決定由自己實現儘可能多的功能。
在寫的過程中,不可避免的需要以多執行緒甚至多程序執行程式。因此解決多執行緒間以及多程序間的同步和通訊問題成為必須。由於程序擁有獨立資源,因此多程序同步比多執行緒同步要更難一些。再加上雖然python中的GIL鎖限制了多執行緒執行的效率,但是由於相關任務比較簡單,並不需要太多計算量,因此決定先使用多執行緒程式設計。
為了解決這個問題,我先查了查網上的資料,發現似乎所有的解決方案都是基於單個檔案的。什麼叫基於單個檔案呢?就是執行緒間需要共享的資料都是宣告在主檔案中的,或者main函式中的
a=1 #需要共享的變數
lock=threading.Lock() #鎖
class test():
def __init__(self):
b=2
if __name__=='__main__':
c=3
舉例來說,上面這個檔案,經過測試發現a是全域性變數。而c似乎也是全域性變數,但是我有點不太確定。而b則肯定不是全域性變數。網上普遍的解決方案是,在a的位置宣告共享變數和lock,這樣無論在該檔案中什麼位置更改變數a,都會造成所有位置中a的值改變。再配合lock,能夠解決生產者和消費者問題。但是我發現,當應用場景更復雜一些時,比如需要將這些功能封裝在一個類中並且由其他程序來呼叫這個類時,上述方法不再有效。因為封裝類中的變數不再是全域性變數
我想了想,目前我能想到的解決方法主要有兩個,一個是將變數a宣告為全域性變數,在子執行緒中再宣告一下global a,這樣在該執行緒的子執行緒中,任何一個子執行緒改動a,所有a值都會改變。
__author__ = 'multiangle'
import time
import threading
class a():
def __init__(self):
global data
data=5
thread=b()
thread.start()
self.loop()
def loop(self) :
while True:
time.sleep(0.2)
print(data)
class b(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(0.4)
global data
data+=1
if __name__=='__main__':
a()
如上所示,能夠得到輸出結果為
5
6
6
7
7
8
8
9
9
10
.
.
.
另外一個方法是將要共享的變數以指標的形式傳遞給各個子執行緒,這樣所有a都指向同一塊記憶體,自然能夠實現變數的同步。由於類物件例項本質也是指標,因此也可以將要共享的變數封裝成類在子執行緒間傳遞,既實現了內容同步,也保護了要共享的變數。這樣有一個潛在的隱患就是所有執行緒的IO壓力都會集中這個類的例項上,可能會造成效能瓶頸,不過目前也不考慮這麼多啦。下面上程式碼
下面這段程式碼涉及了python的多執行緒程式設計,變數共享,還涉及了一些多程序程式設計。
這段程式碼總共4個類,
boss,顧名思義,總攬全域性,跟蹤需要共享變數data的情況,
consum,消費程序
product,生產程序
list_manage , 變數類,核心是要共享的變數pool,還增加了一些外圍方法。
lock和data以指標的形式在子執行緒中傳遞,觀察輸出結果可以發現data是在子執行緒間同步變化的
import threading
import time
from multiprocessing import Process
class boss():
def __init__(self):
data=list_manage()
lock=threading.Lock()
self.data=data
c0=consume(data,'c0',lock)
p=product(data,'p',lock)
c1=consume(data,'c1',lock)
c0.start()
p.start()
c1.start()
self.go()
def go(self):
while True:
time.sleep(0.1)
# print('root process',self.data)
# self.data.print()
print('boss check , ',self.data.show())
class consume(threading.Thread):
def __init__(self,data,id,lock):
threading.Thread.__init__(self)
self.id=id
self.data=data
self.lock=lock
# self.run(data)
def run(self):
while True :
time.sleep(1)
self.lock.acquire()
print('thread ',self.id,' consume once,get ',self.data.get())
self.lock.release()
class product(threading.Thread):
def __init__(self,data,id,lock):
threading.Thread.__init__(self)
self.id=id
self.data=data
self.lock=lock
def run(self):
count=0
while True:
count+=1
time.sleep(0.4)
self.lock.acquire()
self.data.add(count)
self.lock.release()
print('thread ',self.id,' product once, add ',count)
class list_manage():
def __init__(self):
self.pool=[1,2,3]
def get(self):
if self.pool.__len__()>0:
return self.pool.pop()
else:
return None
def add(self,data):
self.pool.append(data)
def print(self):
print(self.pool)
def show(self):
copy=self.pool[:]
return copy
if __name__=='__main__':
p=Process(target=boss,args=())
p.start()
以下為輸出結果,只截取了最開始的一段
boss check , [1, 2, 3]
boss check , [1, 2, 3]
boss check , [1, 2, 3]
thread p product once, add 1
boss check , [1, 2, 3, 1]
boss check , [1, 2, 3, 1]
boss check , [1, 2, 3, 1]
boss check , [1, 2, 3, 1]
thread p product once, add 2
boss check , [1, 2, 3, 1, 2]
boss check , [1, 2, 3, 1, 2]
thread c0 consume once,get 2
thread c1 consume once,get 1
boss check , [1, 2, 3]
boss check , [1, 2, 3]
thread p product once, add 3
boss check , [1, 2, 3, 3]
boss check , [1, 2, 3, 3]
boss check , [1, 2, 3, 3]
boss check , [1, 2, 3, 3]
thread p product once, add 4
boss check , [1, 2, 3, 3, 4]
boss check , [1, 2, 3, 3, 4]
boss check , [1, 2, 3, 3, 4]
boss check , [1, 2, 3, 3, 4]
thread c0 consume once,get 4
thread c1 consume once,get 3
thread p product once, add 5
boss check , [1, 2, 3, 5]
boss check , [1, 2, 3, 5]
boss check , [1, 2, 3, 5]
boss check , [1, 2, 3, 5]
thread p product once, add 6
boss check , [1, 2, 3, 5, 6]
boss check , [1, 2, 3, 5, 6]
boss check , [1, 2, 3, 5, 6]