Python第十周 學習筆記(2)_多線程
並發
- concurrency
- 同一時間內出現多個請求,高並發就是短時間內出現大量請求
並行
- parallel
- 並行是解決並發的一個方法
並發的解決
- 食堂打飯模型
1、隊列、緩沖區
- queue(或LifoQueue、PriorityQueue)先進先出緩沖區(排隊打飯),可以設置一個優先隊列(女生優先)
2、爭搶
- 一旦一個請求獲得資源後,會產生排他鎖(不排隊,誰搶到就是輪到誰打飯)
- 缺點:有可能存在某些請求一直獲得不到資源
3、預處理
- 經常請求的資源可以預處理(緩存)。減少請求占用資源的時間(經常被點的菜品可以提前做)
4、並行
- 通過購買服務器、或多開進程、線程實現並行處理,屬於水平擴展(增加打飯窗口)
5、提速
- 提高單個CPU性能,貨單個服務器安裝更多CPU,屬於垂直擴展(提高單個窗口打飯速度)
6、消息中間件
- 起緩沖作用,常見消息中間件:RabbitMQ、ActiveMQ、RocketMQ、kafka
進程
- 進程間不可隨意共享數據
- 進程是線程的容器
- linux存在父進程、子進程(fork)
線程
- 一個進程可擁有多個線程
- 線程可以共享進程的資源
- 每一個線程都擁有線程ID、當前指令指針、寄存器集合與獨立的堆棧
- 創建線程比創建進程快10-100倍
線程的狀態
ready
- 線程能夠運行,但在等待被調度。可能線程剛剛創建啟動,或剛剛從阻塞中恢復,或者被其他線程搶占
running
- 線程正在運行
blocked
- 線程等待外部事件發生而無法運行,如IO操作
terminated
- 線程完成,或退出被取消
Python線程
- 進程會啟動一個解釋器進程,線程共享一個解釋器進程
Thread類
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
-
target
- 線程調用的對象,就是目標函數
-
name
- 為線程起個名字
-
args
- 為目標函數傳遞實參,參數類型需為元組
-
kwargs
- 為目標函數關鍵字傳參,需為字典
- daemon
- 下文詳述
啟動線程
threading.Thread(target=fn, name=‘fn‘).start()
線程退出
- Python沒有提供線程退出的方法,線程在下面情況時退出
- 1、線程函數內語句執行完畢
- 2、線程函數中拋出未處理異常
Python線程沒有優先級、沒有線程組的概念,不能被銷毀、停止、掛起、恢復、中斷
threading屬性、方法
-
current_thread()
- 返回當前線程對象
-
main_thread()
- 返回主線程對象
-
active_count()
- 當前處於alive狀態的線程個數,包括主線程
-
enumerate()
- 返回所有活著的線程的列表,不包括已經終止的線程和未開始的線程,包括主線程
- get_ident()
- 返回當前線程的ID,非0整數
Thread實例屬性、方法
-
name
- 線程實例的名稱 初始化中的name參數傳入,getName()、setName()可獲取、設置(一般不改)
-
ident
- 線程ID 線程啟動後才有ID,否則為None。線程退出此ID依舊可以訪問。ID可重復使用
-
is_alive()
- 返回線程是否存活
-
start()
- 啟動線程。每個線程只能執行此方法一次
- start()會調用run()
- start函數的實現:
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
-
run()
- 運行線程函數
- 單運行run()不會建立新線程,只是單純的函數調用
- run函數的實現:
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
每次調用run都會刪除引用self._target, self._args, self._kwargs
- 示例:
import threading
import time
class MyThread(threading.Thread):
def start(self):
print(‘-----start-----‘)
super().start()
def run(self):
print(‘---------run----------‘)
super().run()
def pr1(lst):
for _ in lst:
print(‘{} is running‘.format(threading.current_thread().name))
def worker():
name = ‘worker{}‘.format(1)
t = MyThread(target=pr1, name=name, args=([1, 2],))
# time.sleep(.5)
t.start()
# time.sleep(1)
t.run()
if __name__ == ‘__main__‘:
worker()
-
結果(可能發生):
- 由於多線程,有時start()中調用run()的del刪除引用來不及,如上圖結果所示在刪除引用前第二次的run()依然可以調用,打印出兩次MainThread is running
多線程
- 一個進程中至少有一個主線程作為程序入口,其他線程稱作工作線程
線程安全
-
線程執行一段代碼,不會產生不確定行結果,那這段代碼是線程安全的
- 示例:
import threading
def worker():
for x in range(100):
print("{} is running.".format(threading.current_thread().name))
for x in range(5):
name="worker{}".format(x)
t=threading.Thread(name=name,target=worker)
t.start()
- 結果
- 原因
- print函數打印時先打印指定的內容再打印默認的換行符,多線程中,一個print的調用中可能會出現線程的切換
- print函數是線程不安全的
如何解決?
- 1.print只處理輸入的字符
- 改為
print("{} is running.\n".format(threading.current_thread().name), end=‘‘)
-
2.logging
-
日誌處理模塊,線程安全
- 示例:
-
import threading
import logging
def worker():
for x in range(100):
logging.warning("{} is running.".format(threading.current_thread().name))
for i in range(1,6):
name="worker{}".format(i)
t=threading.Thread(name=name,target=worker)
t.start()
daemon和non-daemon線程
-
daemon=True
- 當主線程執行完,如果不存在正在執行的non-daemon線程,則強制所有線程同主線程結束
-
daemon=False
- 即使主線程執行完,會等待non-daemon線程執行完畢
- daemon=None
- 與當前調用線程(父線程)的daemon類型相同
daemon必須在start()之前設置,否則引發RuntimeError異常
-
isDaemon()
- 是否是daemon線程
-
setDaemon
- 設置為daemon線程,必須在start之前設置
- join(timeout=None)
- 一個線程中調用另一個線程的join方法,調用者將變為阻塞狀態,直到被調用線程終止
- 一個線程可以被join多次
daemon線程應用場景
- 1.後臺任務,如心跳包、監控
- 2.主線程才有用的線程,如主線程中維護公共資源,當主線程已經清理了,準備退出,而工作線程使用這些資源工作也已經沒有意義。一起退出最合適
threading.local類
- 將這個類實例化得到一個全局對象,但不同的線程使用這個對象存儲的數據其他線程看不見。
threading.Timer
-
繼承自Thread,用來定義多久執行一個函數
- threading.Timer(interval, function, args=None, kwargs=None)
- start方法執行後,Timer對象等待interval時間後,執行function,
- 如果在執行函數之前的等待階段,使用了cancel方法,就會跳過函數執行
線程同步
概念
-
線程同步,線程間協同,通過鎖,讓一個線程訪問某些數據時,其他線程不能訪問這些數據,直到該線程完成對數據的操作
- 不同操作系統實現技術有所不同,有臨界區(Critical Section)、互斥量(Mutex)、信號量(Semaphore)、事件(Event)等
Event
-
是線程間通信機制中最簡單的實現,使用一個內部的標記flag的變化來操作
-
set()
- 將標記設為True
-
clear()
- 將標記設為False
-
is_set()
- 標記是否為True
- wait(timeout=None)
- 設置等待標記為True的時長,None為無限等待,直到等到返回True,未等到超時返回False
使用同一個Event對象的標記flag
不限制等待的個數wait效率優於time.sleep,它會更快切換線程,提高並發效率
Lock
-
一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞
-
acquire(blocking=True, timeout=-1)
- 默認阻塞,阻塞可以設置超時時間。非阻塞時,timeout禁止設置。成功獲取鎖,返回True,否則返回False
- release()
- 釋放鎖。可以從任何線程調用釋放
- 已上鎖的鎖,會被重置為unlocked未上鎖的鎖上調用,拋RuntimeError異常
鎖保護了數據完整性,但是性能下降很多
加鎖、解鎖常用語句:
- 1.使用try...finallly語句
- 2.with上下文
鎖的應用場景
- 鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候
- 如果只讀取同一個共享資源不需要鎖
非阻塞鎖
import threading
import time
import logging
FORMAT = ‘%(asctime)-15s\t [%(threadName)s %(thread)8d] %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker(tasks):
for task in tasks:
time.sleep(0.001)
if task.lock.acquire(False):
logging.info(‘{} {} begin to start‘.format(threading.current_thread(), task.name))
else:
logging.info(‘{} {} is working‘.format(threading.current_thread(), task.name))
class Task:
def __init__(self, name):
self.name = name
self.lock = threading.Lock()
tasks = [Task(‘task-{}‘.format(x)) for x in range(10)]
for i in range(5):
threading.Thread(target=worker, name=‘worker-{}‘.format(i), args=(tasks,)).start()
可重入鎖
- 線程相關鎖
-
線程A獲得可重復鎖,並可以多次成功獲取,不會阻塞。最後要在線程A中做和acquire次數相同的release
- 可重入鎖,與線程相關,可在一個線程中獲取鎖,並可繼續在同一線程中不阻塞獲取鎖。當鎖未釋放完,其他線程獲取鎖就會阻塞,直到當前持有鎖的線程釋放完鎖
Condition
-
Condition(lock=None),可以傳入一個Lock或Rlock對象,默認是Rlock
-
acquire(*args)
- 獲取鎖
-
wait(self, timeout=None)
- 等待或超時
-
notify(n=1)
- 喚醒至多指定數目個數的等待的線程,沒有等待的線程就沒有任何操作
-
notify_all()
- 喚醒所有等待的線程
- 用於生產者、消費者模型,解決生產者消費者速度匹配問題
- 采用了通知機制,非常有效率
使用方式
- 使用Condition,必須先acquire,用完了要release,因為內部使用了鎖,最好的方式是使用with上下文
- 消費者wait,等待通知
- 生產者生產好消息,對消費者發通知,可以使用notify或notify_all
Python第十周 學習筆記(2)_多線程