python 並發和線程
並發和線程
基本概念 - 並行、並發
並行, parallel
互不幹擾的在同一時刻做多件事;
如,同一時刻,同時有多輛車在多條車道上跑,即同時發生的概念.
並發, concurrency
同時做某些事,但是強調同一時段做多件事.
如,同一路口,發生了車輛要同時通過路面的事件.
隊列, 緩沖區
類似排隊,是一種天然解決並發的辦法.排隊區域就是緩沖區.
解決並發:
【 "食堂打飯模型", 中午12點,大家都湧向食堂,就是並發.人很多就是高並發.】
1、隊列, 緩沖區:
隊列: 即排隊.
緩沖區: 排成的隊列.
優先隊列: 如果有男生隊伍和女生隊伍,女生隊伍優先打飯,就是優先隊列.
2、爭搶:
鎖機制: 爭搶打飯,有人搶到,該窗口在某一時刻就只能為這個人服務,鎖定窗口,即鎖機制.
爭搶也是一種高並發解決方案,但是有可能有人很長時間搶不到,所以不推薦.
3、預處理:
統計大家愛吃的菜品,最愛吃的80%熱門菜提前做好,20%冷門菜現做,這樣即使有人鎖定窗口,也能很快釋放.
這是一種提前加載用戶需要的數據的思路,預處理思想,緩存常用.
4、並行:
開多個打飯窗口,同時提供服務.
IT日常可以通過購買更多服務器,或多開線程,進程實現並行處理,解決並發問題.
這是一種水平擴展的思路.
註: 如果線程在單CPU上處理,就不是並行了.
5、提速:
通過提高單個窗口的打飯速度,也是解決並發的方式.
IT方面提高單個CPU性能,或單個服務器安裝更多的CPU.
這是一種垂直擴展的思想.
6、消息中間件:
如上地地鐵站的九曲回腸的走廊,緩沖人流.
常見消息中間件: RabbitMQ, ActiveMQ(Apache), RocketMQ(阿裏Apache), kafka(Apache)等.
【以上例子說明: 技術源於生活! 】
進程和線程
a) 在實現了線程的操作系統中,線程是操作系統能夠運算調度的最小單位.
b) 線程被包含在進程中,是進程的實際運作單位.
c) 一個程序的執行實例就是一個進程.
進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎.
進程和程序的關系: 進程是線程的容器.
Linux進程有父進程和子進程之分,windows的進程是平等關系.
線程有時稱為輕量級進程,一個標準的線程由線程ID,當前指令指針,寄存器集合和堆棧組成.
當運行一個程序時,OS會創建一個進程。它會使用系統資源(CPU、內存和磁盤空間)和OS內核中的數據結構(文件、網絡連接、用量統計等)。
進程之間是互相隔離的,即一個進程既無法訪問其他進程的內容,也無法操作其他進程。
操作系統會跟蹤所有正在運行的進程,給每個進程一小段運行時間,然後切換到其他進程,這樣既可以做到公平又可以響應用戶操作。
可以在圖形界面中查看進程狀態,在在Windows上可以使用任務管理器。也可以自己編寫程序來獲取進程信息。
# 獲取正在運行的python解釋器的進程號和當前工作目錄,及用戶ID、用戶組ID。
In [1]: import os In [2]: os.getpid() Out[2]: 2550 In [3]: os.getuid() Out[3]: 0 In [4]: os.getcwd() Out[4]: ‘/root‘ In [5]: os.getgid() Out[5]: 0 In [6]:
對線程、線程的理解:
- 進程是獨立的王國,進程間不能隨便共享數據.
- 線程是省份,同一進程內的線程可以共享進程的資源,每一個線程有自己獨立的堆棧.
線程的狀態:
- 就緒(Ready): 線程一旦運行,就在等待被調度.
- 運行(Running): 線程正在運行.
- 阻塞(Blocked): 線程等待外部事件發生而無法運行,如I/O操作.
- 終止(Terminated): 線程完成或退出,或被取消.
python中的進程和線程: 進程會啟動一個解釋器進程,線程共享一個解釋器進程.
python的線程開發
python線程開發使用標準庫threading.
thread類
# 簽名
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
- target: 線程調用的對象,就是目標函數.
- name: 為線程起個名字.
- args: 為目標函數傳遞實參, 元組.
- kwargs: 為目標函數關鍵字傳參, 字典.
線程啟動
import threading
# 最簡單的線程程序
def worker(): print("I‘m working") print("Finished") t = threading.Thread(target=worker, name=‘worker‘) # 線程對象. t.start()
通過threading.Thread創建一個線程對象,target是目標函數,name可以指定名稱.
但是線程沒有啟動,需要調用start方法.
線程會執行函數,是因為線程中就是執行代碼的,而最簡單的封裝就是函數,所以還是函數調用.
函數執行完,線程也會隨之退出.
如果不讓線程退出,或者讓線程一直工作: 函數內部使用while循環.
import threading import time def worker(): while True: time.sleep(1) print("I‘m work") print(‘Finished‘) t = threading.Thread(target=worker, name=‘worker‘) # 線程對象. t.start() # 啟動.
線程退出
python沒有提供線程退出的方法,在下面情況時會退出:
- 線程函數內語句執行完畢.
- 線程函數中拋出未處理的異常.
import threading import time def worker(): count = 0 while True: if (count > 5): raise RuntimeError() # return time.sleep(1) print("I‘m working") count += 1 t = threading.Thread(target=worker, name=‘worker‘) # 線程對象. t.start() # 啟動. print("==End==")
python的線程沒有優先級,沒有線程組的概念,也不能被銷毀、停止、掛起,自然也沒有恢復、中斷.
線程的傳參
import threading import time def add(x, y): print(‘{} + {} = {}‘.format(x, y, x + y, threading.current_thread())) thread1 = threading.Thread(target=add, name=‘add‘, args=(4, 5)) # 線程對象. thread1.start() # 啟動. time.sleep(2) thread2 = threading.Thread(target=add, name=‘add‘,args=(5, ), kwargs={‘y‘: 4}) # 線程對象. thread2.start() # 啟動. time.sleep(2) thread3 = threading.Thread(target=add, name=‘add‘, kwargs={‘x‘: 4, ‘y‘: 5}) # 線程對象. thread3.start() # 啟動.
線程傳參和函數傳參沒什麽區別,本質上就是函數傳參.
threading的屬性和方法
current_thread() # 返回當前線程對象.
main_thread() # 返回主線程對象.
active_count() # 當前處於alive狀態的線程個數.
enumerate() # 返回所有活著的線程的列表,不包括已經終止的線程和未開始的線程.
get_ident() # 返回當前線程ID,非0整數.
active_count、enumerate方法返回的值還包括主線程。
import threading import time def showthreadinfo(): print(‘currentthread = {}‘.format(threading.current_thread())) print(‘main thread = {}‘.format(threading.main_thread()), ‘"主線程對象"‘) print(‘active count = {}‘.format(threading.active_count()), ‘"alive"‘) def worker(): count = 1 showthreadinfo() while True: if (count > 5): break time.sleep(1) count += 1 print("I‘m working") t = threading.Thread(target=worker, name=‘worker‘) # 線程對象. showthreadinfo() t.start() # 啟動. print(‘==END==‘)
thread實例的屬性和方法
name: 只是一個名稱標識,可以重名, getName()、setName()來獲取、設置這個名詞。
ident: 線程ID, 它是非0整數。線程啟動後才會有ID,否則為None。線程退出,此ID依舊可以訪問。此ID可以重復使用。
is_alive(): 返回線程是否活著。
註: 線程的name是一個名稱,可以重復; ID必須唯一,但可以在線程退出後再利用。
import threading import time def worker(): count = 0 while True: if (count > 5): break time.sleep(1) count += 1 print(threading.current_thread().name, ‘~~~~~~~~~~~~~~~~~~~~~~~‘) t = threading.Thread(name=‘worker‘, target=worker) print(t.ident) t.start() while True: time.sleep(1) if t.is_alive(): print(‘{} {} alive‘.format(t.name, t.ident)) else: print(‘{} {} dead‘.format(t.name, t.ident)) t.start()
start(): 啟動線程。每一個線程必須且只能執行該方法一次。
run(): 運行線程函數。
為了演示,派生一個Thread子類
# start方法.
import threading import time def worker(): count = 0 while True: if (count >= 5): break time.sleep(1) count += 1 print(‘worker running‘) class MyThread(threading.Thread): def start(self): print(‘start~~~~~~~~~~~~~‘) super().start() def run(self): print(‘run~~~~~~~~~~~~~~~~~‘) super().run() t = MyThread(name=‘worker‘, target=worker) t.start()
run方法
import threading import time def worker(): count = 0 while True: if (count > 5): break time.sleep(1) count += 1 print(‘worker running‘) class MyThread(threading.Thread): def start(self): print(‘start~~~~~~~~~~~~~~~‘) super().start() def run(self): print(‘run~~~~~~~~~~~~~~~~~‘) super().run() t = MyThread(name=‘worker‘, target=worker)
# t.start()
t.run()
start()方法會調用run()方法,而run()方法可以運行函數。
這兩個方法看似功能重復,但不能只留其一,如下:
import threading import time def worker(): count = 0 while True: if (count > 5): break time.sleep(1) count += 1 print("worker running") print(threading.current_thread().name) class MyThread(threading.Thread): def start(self): print(‘start~~~~~~~~~~~~~‘) super().start() def run(self): print(‘run~~~~~~~~~~~~~~~‘) super().run() t = MyThread(name=‘worker‘, target=worker) # t.start()
t.run() # 分別執行start或者run方法。
使用start方法啟動線程,啟動了一個新的線程,名字叫做worker running,但是使用run方法啟動的線程,並沒有啟動新的線程,只是在主線程中調用了一個普通的函數而已。
因此,啟動線程要使用start方法,才能啟動多個線程。
多線程
顧名思義,多個線程,一個進程中如果有多個線程,就是多線程,實現一種並發。
import threading import time def worker(): count = 0 while True: if (count > 5): break time.sleep(2) count += 1 print(‘worker running‘) print(threading.current_thread().name, threading.current_thread().ident) class MyThread(threading.Thread): def start(self): print(‘start~~~~~~~~~~~~~~‘) super().start() def run(self): print(‘run~~~~~~~~~~~~~~~~~~‘) super().run() # 查看父類在做什麽? t1 = MyThread(name=‘worker1‘, target=worker) t2 = MyThread(name=‘worker2‘, target=worker) t1.start() t2.start()
可以看到worker1和worker2交替執行。
換成run方法試試:
import threading import time def worker(): count = 0 while True: if (count > 5): break time.sleep(1) count += 1 print(‘worker running‘) print(threading.current_thread().name, threading.current_thread().ident) class MyThread(threading.Thread): def start(self): print(‘start~~~~~~‘) super().start() def run(self): print(‘run~~~~~~~~~~~~‘) super().run() t1 = MyThread(name=‘worker1‘, target=worker) t2 = MyThread(name=‘worker2‘, target=worker) # t1.start() # t2.start() t1.run() t2.run()
沒有開新的線程,這就是普通函數調用,所以執行完t1.run(),然後執行t2.run(),這裏就不是多線程。
當使用start方法啟動線程後,進程內有多個活動的線程並行的工作,就是多線程。
一個進程中至少有一個線程,並作為程序的入口,這個線程就是主線程。一個進程至少有一個主線程。
其他線程稱為工作線程。
線程安全
需要在ipython中演示:
In [1]: import threading ...: def worker(): ...: for x in range(5): ...: print("{} is running".format(threading.current_thread().name)) ...: ...: for x in range(1, 5): ...: name = ‘worker{}‘.format(x) ...: t = threading.Thread(name=name, target=worker) ...: t.start() ...:
可以看到運行結果中,本應該是一行行打印,但很多字符串打印在了一起,這說明print函數被打斷了,被線程切換打斷了。
print函數分兩步,第一步打印字符串,第二部換行,就在這之間,發生了線程的切換。
說明print函數不是線程安全函數。
線程安全: 線程執行一段代碼,不會產生不確定的結果,那這段代碼就是線程安全的。
1、不讓print打印換行:
import threading def worker(): for x in range(100): print(‘{} is running\n‘.format(threading.current_thread().name), end=‘‘) for x in range(1, 5): name = ‘worker{}‘.format(x) t = threading.Thread(name=name, target=worker) t.start()
字符串是不可變類型,它可以作為一個整體不可分割輸出。end=‘‘的作用就是不讓print輸出換行。
2、使用logging.
標準庫裏面的logging模塊、日誌處理模塊、線程安全、生成環境代碼都使用logging。
import threading import logging def worker(): for x in range(100): # print("{} is running.\n".format(threading.current_thread().name), end=‘‘) logging.warning(‘{} is running‘.format(threading.current_thread().name)) for x in range(1, 5): name = ‘work{}‘.format(x) t = threading.Thread(name=name, target=worker) t.start()
daemon線程和non-daemon線程
註:這裏的daemon不是Linux中的守護進程。
進程靠線程執行代碼,至少有一個主線程,其他線程是工作線程。
主線程是第一個啟動的線程。
父線程:如果線程A中啟動了一個線程B,A就是B的父線程。
子線程:B就是A的子線程。
python中構造線程的時候可以設置daemon屬性,這個屬性必須在start方法之前設置好。
源碼Thread的__init__方法中:
if daemon is not None: self._daemonic = daemon # 用戶設定bool值。 else: self._daemonic = current_thread().daemon self._ident = None
線程daemon屬性,如果設定就是用戶的設置,否則就取當前線程的daemon值。
主線程是non-daemon,即daemon=False。
import time import threading def foo(): time.sleep(5) for i in range(20): print(i)
# 主線程是non-daemon線程.
t = threading.Thread(target=foo, daemon=False)
t.start()
print(‘Main Thread Exiting‘)
運行發現線程t依然執行,主線程已經執行完,但是一直等著線程t.
修改為 t = threading.Threading(target=foo, daemon=True),運行發現主線程執行完程序立即結束了,根本沒有等線程t.
import threading import logging logging.basicConfig(level=logging.INFO) #警告級別 import time def worker(): for x in range(10): time.sleep(1) msg = ("{} is running".format(threading.current_thread())) logging.info(msg) t = threading.Thread(target=worker1,name="worker1-{}".format(x),daemon=False) t.start() # t.join() def worker1(): for x in range(10): time.sleep(0.3) msg = ("¥¥¥¥¥{} is running".format(threading.current_thread())) logging.info(msg) t = threading.Thread(target=worker,name=‘worker-{}‘.format(0),daemon=True) t.start() # t.join() time.sleep(0.3) print(‘ending‘) print(threading.enumerate())
結論:
daemon=False 運行發現子線程依然執行,主線程已經執行完,但是主線程會一直等著子線程執行完.
daemon=True 運行發現主線程執行完程序立即結束了。
daemon屬性:表示線程是否是daemon線程,這個值必須在start()之前設置,否則引發RuntimeError異常。
isDaemon():是否是daemon線程。
setDaemon:設置為daemon線程,必須在start方法之前設置。
總結:
線程具有一個daemon屬性,可以顯式設置為True或False,也可以不設置,不設置則取默認值None。
如果不設置daemon,就取當前線程的daemon來設置它。子子線程繼承子線程的daemon值,作用和設置None一樣。
主線程是non-daemon線程,即daemon=False。
從主線程創建的所有線程不設置daemon屬性,則默認都是daemon=False,也就是non-daemon線程。
python程序在沒有活著的non-daemon線程運行時退出,也就是剩下的只能是daemon線程,主線程才能退出,否則主線程就只能等待。
如下程序輸出:
import time import threading def bar(): time.sleep(10) print(‘bar‘) def foo(): for i in range(20): print(i) t = threading.Thread(target=bar, daemon=False) t.start() # 主線程是non-daemon線程. t = threading.Thread(target=foo, daemon=True) t.start() print(‘Main Threading Exiting‘)
上例中,沒有輸出bar這個字符串,如何修改才會打印出來bar?
import time import threading def bar(): time.sleep(1) print(‘bar‘) def foo(): for i in range(5): print(i) t = threading.Thread(target=bar, daemon=False) t.start() # 主線程是non-daemon線程. t = threading.Thread(target=foo, daemon=True) t.start() time.sleep(1) print(‘Main Threading Exiting‘)
再看一個例子,看看主線程合適結束daemon線程。
import time import threading def foo(n): for i in range(n): print(i) time.sleep(1) t1 = threading.Thread(target=foo, args = (10, ), daemon=True) # 調換10和20,看看效果。 t1.start() t2 = threading.Thread(target=foo, args = (20, ), daemon=False) t2.start() time.sleep(2) print(‘Main Threading Exiting‘)
上例說明,如果有non-daemon線程的時候,主線程退出時,也不會殺掉所有daemon線程,直到所有non-daemon線程全部結束,
如果還有daemon線程,主線程需要退出,會結束所有 daemon線程,退出。
join方法
import time import threading def foo(n): for i in range(n): print(i) time.sleep(1) t1 = threading.Thread(target=foo, args=(10, ), daemon=False) t1.start() t1.join() # 設置join. print(‘Main Thread Exiting‘)
使用了join方法後,daemon線程執行完了,主線程才退出。
join(timeout=None),是線程的標準方法之一。
一個線程中調用另一個線程的join方法,調用者將被阻塞,直到被調用線程終止。
一個線程可以被join多次。
timeout參數指定調用者等待多久,沒有設置超時,就一直等待被調用線程結束。
調用誰的join方法,就是join誰,就要等誰。
python 並發和線程