【python】Python 執行緒呼叫
阿新 • • 發佈:2022-01-05
簡介:
Python 執行緒可以通過主執行緒,呼叫執行緒來執行其他命令,
為Python提供更方便的使用。
- 併發執行緒測試
# 命令呼叫方式 import threading,time # 定義每個執行緒要執行的函式 def run(n): print("task",n) time.sleep(1) # 生成一個執行緒例項target=目標,args=引數 t1 = threading.Thread(target=run,args=("t1",)) t2 = threading.Thread(target=run,args=("t2",)) #啟動執行緒 t1.start() t2.start()
- 併發類的繼承測試
import threading,time # 繼承threading.Thread class MyThread(threading.Thread): # 重寫父類的建構函式 def __init__(self,n): # 重構建構函式 super(MyThread.self).__init__() self.n = n # 定義每個執行緒要執行的函式 def run(self): print("running task ",self.n) # 例項化 t1 = MyThread("t1") t2 = MyThread("t2") # 啟動執行緒 t1.start() t2.start()
- 實現併發序列執行命令
import threading,time def run(n): print("task",n) time.sleep(1) start_time = time.time() for i in range(50): t = threading.Thread(target=run,args=("t-%s"%i,)) t.start() t.join() # 注:預設主執行緒不會等子執行緒執行完在執行。 # 注:序列:等待前一個命令完全執行完,在執行下一條命令。 # 注:例項.join() #wait() 等待執行完畢執行下條命令,可實現序列。
- 實現主執行緒等待子執行緒執行完畢後在往下執行
import threading,time
# 定義每個執行緒要執行的函式
def run(n):
print("task",n)
time.sleep(2)
# 測試執行結果後的用時時間
start_time = time.time()
# 建立列表接收t.start()物件內容。
t_objs = []
# 迴圈50次
for i in range(3):
# 生成一個執行緒例項target=目標,args=引數
t = threading.Thread(target=run,args=("t-%s"%i,))
# 啟動執行緒 呼叫run
t.start()
# 執行結果新增到列表
t_objs.append(t)
# 將每個迴圈t物件一次進行等待。
for t in t_objs:
# 每迴圈一次等待一次
t.join()
# 列印用時時間
print("cost:",time.time() - start_time)
- 守護程序
守護執行緒:依賴於主執行緒存在的,為主執行緒工作。
功能:設定成守護執行緒後,主執行緒關閉,守護執行緒也連帶關閉。
使用場景:socketserver 每一個連結過來,socketserver就會啟動一
個執行緒,當服務端關閉時,需要關閉其他執行緒,就可以用
到守護執行緒,關閉。
```
import threading,time
定義每個執行緒要執行的函式
def run(n):
print("task",n)
time.sleep(5)
建立列表接收t.start()物件內容。
t_objs = []
迴圈3次
for i in range(3):
# 生成一個執行緒例項target=目標,args=引數
t = threading.Thread(target=run,args=("t-%s"%i,))
# 要在start之前設定
# 把當前執行緒設定為守護執行緒
t.setDaemon(True)
# 啟動執行緒 呼叫run
t.start()
注:所有程式執行完畢,還有一個join等待,執行完畢退出程式。
注:主執行緒MainThread,執行緒Thread。
threading.current_thread() #測試當前活動的執行緒型別。
threading.active_count() #測試當前活動的執行緒數。