Python執行緒/程序
GIL
GIL
:global interpreter lock
全域性鎖
限制:使得python
中一個執行緒對應於C
語言中的一個執行緒。同一時刻只有一個執行緒執行在一個CPU
上執行位元組碼,無法將多執行緒執行到多CPU
上。
釋放:根據位元組碼執行行數或者時間片來釋放鎖,或者遇到io
操作時會主動釋放。
顯示位元組碼:from dis import dis
區別
- 耗
CPU
操作(例如運算)採用多程序更加具有優勢(GIL
使得多執行緒沒有辦法在不同的CPU
上執行) - 耗
IO
操作採用多執行緒更具有優勢(當碰見IO
操作時,會主動釋放GIL
)
ThreadLocal
使用場景:多執行緒環境下,如果使用全域性變數,需要加鎖。但是如果每個執行緒都需要擁有自己的私有資料時,即這個資料對其他執行緒不可見。可以使用ThreadLocal
import threading global_data = threading.local() def show(): print threading.current_thread().getName(), global_data.num def thread_cal(): global_data.num = 0 for _ in xrange(1000): global_data.num += 1 show() threads = [] for i in range(10): threads.append(threading.Thread(target=thread_cal)) threads[i].start()
join()
和setDaemon()
join
:主執行緒啟動若干個子執行緒後,可以繼續執行主執行緒的程式碼,也可以等待所有子執行緒執行完畢後繼續執行主執行緒。如果子執行緒呼叫了join
方法,則表明主執行緒必須等子執行緒執行完了才可以往下繼續執行。因此,如果需要所有的子執行緒都能再主執行緒結束前被執行完畢,則必須為每一個子執行緒都註冊join
。
setDaemon()
:設定此執行緒是否被主執行緒守護回收。需要在start
方法前呼叫,設為True
時,當主執行緒結束之後,會回收此子執行緒。預設是False
,即主執行緒結束時不會回收子執行緒。
建立執行緒
Threading
模組
# -*- coding: utf-8 -*- import time import threading def get_detail_html(url): print("get detail html started") time.sleep(2) print("get detail html end") def get_detail_url(url): print("get detail url started") time.sleep(2) print("get detail url end") if __name__ == "__main__": thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start() print("last time: {}".format(time.time() - start_time))
派生Thread
的子類,並建立例項
需要過載
run
方法
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")
class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
print("get detail url started")
time.sleep(2)
print("get detail url end")
if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start()
print("last time: {}".format(time.time() - start_time))
Queue
使用佇列,可以進行阻塞操作。它是基於deque
開發的,因而是執行緒安全的。
初始化:
import Queue
msg_deque = Queue.Queue()
放入佇列:
msg_deque.put(data)
取出佇列,沒有時間預設阻塞等待:
msg_deque.get(block=True)
msg_deque.task_done()
:每次從queue
中get
一個數據之後,當處理好相關問題,最後呼叫該方法,以提示msg_deque.join()
是否停止阻塞。讓執行緒向前執行或者退出:
import queue
q = queue.Queue(2)
q.put(15)
q.put(59)
q.get()
q.task_done() # get取完佇列中的一個值後,使用task_done方法告訴佇列,我已經取出了一個值並處理完畢
q.get()
q.task_done()
q.join()
如果沒有
task_done
,那麼會一直阻塞在join
處
佇列型別:
- 先進先出佇列:
queue.Queue
- 後進先出佇列:
queue.LifoQueue
- 優先順序佇列:
queue.PriorityQueue
- 雙向佇列:
queue.deque
原始碼分析:
def put(self, item, block=True, timeout=None):
# self.not_full是一個條件變數,如果size滿足的話,直接進行put操作(append),並且釋放鎖給get
# 否則等待not_full(由get中的notify觸發)
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
get
同理,如果size
為空,則等待not_empty
,否則通知put
可進行操作:
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
同步機制
除了
Lock
之外,還存在其他的鎖
RLock
:可重入鎖,允許在同一執行緒中被呼叫多次,但是acquire
與release
的數量應當相等。
Condition
:條件同步機制,一個執行緒等待特定條件,另一個執行緒發出特定條件滿足的訊號。具有wait
和notify
兩種方法。Condition
實際上有兩把鎖,當使用wait
方法時,會釋放最外層的鎖,並分配一把鎖,加至__waiters__
這個佇列裡,只有當呼叫notify
方法時,才會從__waiters
方法中釋放鎖。
Semaphore
:通過計數器限制可同時執行的執行緒數量。acquire()
遞減計數器,release()
則是增加計數器。
# -*- coding: utf-8 -*-
import threading
class Client(threading.Thread):
def __init__(self, cond):
super().__init__(name='client')
self.cond = cond
def run(self):
with self.cond:
self.cond.wait()
print("client saying")
self.cond.notify()
class Server(threading.Thread):
def __init__(self, cond):
super().__init__(name='server')
self.cond = cond
def run(self):
with self.cond:
print("server saying")
self.cond.notify()
self.cond.wait()
if __name__ == "__main__":
cond = threading.Condition()
server = Server(cond)
client = Client(cond)
client.start()
server.start()
condition
原始碼分析:
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 獲得一個內部鎖
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
# 釋放condition鎖
saved_state = self._release_save()
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
# 等待notify釋放鎖才能再次進入
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
# 恢復condition鎖
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
Semaphore
:主要用於控制獲取鎖的數量
# -*- coding: utf-8 -*-
import threading
class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem
def run(self):
import time
time.sleep(2)
print('success!')
self.sem.release()
class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem
def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider('http://www.baidu.com', self.sem)
html_thread.start()
if __name__ == '__main__':
sem = threading.Semaphore(3)
url_procuder = UrlProducer(sem)
url_procuder.start()
Event
:一個執行緒通知事件,其他執行緒等待事件。內建了一個初始為False
的標誌,當呼叫set()
時為True
,呼叫clear()
時重置為False
。包括一下方法:
isSet()
:當內建標誌為True
時返回True
set()
:將內建標誌位置為True
clear()
:將標誌設為False
wait[timeout]
:如果標誌位為True
將立即返回,否則阻塞執行緒至等待阻塞狀態,等待其他執行緒呼叫set()
。當timeout
不為空時,如果發生了超時,則返回值為False
,否則返回狀態為True
。
舉個例子:
# -*- coding: utf-8 -*-
import threading
import time
event = threading.Event()
def func():
# 等待事件,進入等待阻塞狀態
print '%s wait for event...' % threading.currentThread().getName()
event.wait()
# 收到事件後進入執行狀態
print '%s recv event.' % threading.currentThread().getName()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()
time.sleep(2)
# 傳送事件通知
print 'MainThread set event.'
event.set()
在實際中,通常為每個執行緒準備一個獨立的
Event
,而不是多個執行緒共享。應用場景,當用戶成功上線了,才開啟接收訊息的執行緒。
concurrent.futures
執行緒池
使用標準庫的from concurrent.futures
下的ThreadPoolExecutor
。
主執行緒可以獲取某一個執行緒的狀態或者某一個任務的狀態(done
),以及返回值(result
)。
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的執行緒池
future1 = pool.submit(return_future_result, ("hello")) # 往執行緒池裡面加入一個task
future2 = pool.submit(return_future_result, ("world")) # 往執行緒池裡面加入一個task
print(future1.done()) # 判斷task1是否結束
time.sleep(3)
print(future2.done()) # 判斷task2是否結束
print(future1.result()) # 檢視task1返回的結果
print(future2.result()) # 檢視task2返回的結果
ThreadPoolExecutor(pool_count)
中pool_count
代表建立執行緒的數量,會返回一個該執行緒池的執行者物件,這個物件的submit()
方法和map()
方法,能夠使用執行緒池中的執行緒來執行我們指定的方法,並且返回一個Future
物件。並且當執行緒結束之後會自動歸還,如果執行緒全部被佔用,則會阻塞。
如果需要新增引數:
def return_future_result(message, n):
time.sleep(n)
return message
pool = ThreadPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的執行緒池
future1 = pool.submit(return_future_result, "hello", 2) # 往執行緒池裡面加入一個task
future2 = pool.submit(return_future_result, "world", 1) # 往執行緒池裡面加入一個task
通過as_completed()
方法獲取已經完成的任務的結果:
for future in as_completed(all_tasks):
print(future.result())
或者使用map
:
for data in pool.map(get_url, urls):
print(data)
與
as_completed
不同的是,map
返回的不是future
型別,同時輸出順序也與urls
一致。
主要有以下方法:
submit
:立即返回result
:獲取執行結果cancel
:取消task
(未開始執行的),成功返回True
,失敗返回False
as_completed
:是一個生成器,先完成的task
先返回,返回的是future
型別map
:根據傳入的順序進行返回,返回的直接是future.result()
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
def get_url(times):
time.sleep(times)
print("get page {} success".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
# task1 = executor.submit(get_url, (3))
# task2 = executor.submit(get_url, (2))
# print(task1.done())
# print(task2.cancel())
urls = [3, 2, 4]
all_task = [executor.submit(get_url, (url)) for url in urls]
wait(all_task)
print("all task done")
# for future in as_completed(all_task):
# data = future.result()
# print("get page {}".format(data))
# for future in executor.map(get_url, urls):
# print("get page {}".format(future))
上面演示了兩種方法進行多執行緒下載的方式,一種是利用
as_completed()
,另一種是利用ThreadPoolExecutor.map()
。
通過ThreadPoolExecutor.submit()
方法返回的是futures
物件,作為as_completed
的引數,因而可以呼叫其result
方法,並且不會發生阻塞(因為as_completed()
實際上是一個生成器)。
而針對後者,返回值是一個迭代器,通過其__next__
方法呼叫各個futures
的result
方法,因此得到的是futures
的結果。
如果需要提交任務的函式是一樣的,就可以簡化成map
。但是加入提交的任務函式不一樣的,或者執行的過程之可能出現異常,就要用到submit
。
流程圖:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
executor.map
會建立多個_WorkItem
物件,每個物件都傳入了新建立的一個Future
物件。- 把每個
_WorkItem
物件然後放進一個叫做「Work Items」
的dict
中,鍵是不同的「Work Ids」
。 - 建立一個管理
「Work Ids」
佇列的執行緒「Local worker thread」
,它能做2件事:- 從
「Work Ids」
佇列中獲取Work Id, 通過「Work Items」找到對應的_WorkItem
。如果這個Item
被取消了,就從「Work Items」
裡面把它刪掉,否則重新打包成一個_CallItem
放入「Call Q」
這個佇列。executor
的那些程序會從佇列中取_CallItem
執行,並把結果封裝成_ResultItems
放入「Result Q」
佇列中。 - 從
「Result Q」
佇列中獲取_ResultItems
,然後從「Work Items」
更新對應的Future
物件並刪掉入口。
- 從
程序池
multiprocess
中的方法與多執行緒的比較類似:
apply_async
:非阻塞的,與之對應的是apply
是阻塞的。所以需要使用join
,否則在子程序還沒開始執行之前主程序就已經完成退出了。close
:關閉pool
,使其不再接受新的任務terminate
:結束工作程序,不再處理未完成的任務join
:主程序阻塞,等待子程序的退出,join
方法要在close
或terminate
之後使用。
# -*- coding: utf-8 -*-
import multiprocessing
import time
def get_html(n):
time.sleep(n)
print("sub_progress success")
return n
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
results = []
for i in range(4):
results.append(pool.apply_async(get_html, args=(i,)))
pool.close()
pool.join()
for result in results:
print(result.get())
輸出結果:
sub_progress success
sub_progress success
sub_progress success
sub_progress success
0
1
2
3
程序間通訊
Queue
:不能使用from queue import Queue
,這種方式只適用於同一程序下不同執行緒之間的共享,在多程序程式設計中,每一個程序中都會產生一個相應的副本。應該使用multiprocess.Queue
。但是,這個Queue
不能用於程序池。執行緒池中應該使用Manager().Queue()
。
# -*- coding: utf-8 -*-
import time
from multiprocessing import Process, Queue
from threading import Thread
# from queue import Queue
def producer(queue):
queue.put("a")
time.sleep(2)
def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)
if __name__ == '__main__':
queue = Queue(10)
my_producer = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_consumer.start()
my_producer.start()
my_producer.join()
my_consumer.join()
Manager()
中定義了很多共享變數。
Pipe
:只適用於兩個程序之間的通訊,但是複雜度沒有Queue
高。
# -*- coding: utf-8 -*-
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe
def producer(pipe):
pipe.send("a")
def consumer(pipe):
print(pipe.recv())
if __name__ == '__main__':
send_pipe, recv_pipe = Pipe()
my_producer = Process(target=producer, args=(send_pipe, ))
my_consumer = Process(target=consumer, args=(recv_pipe, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()