1. 程式人生 > 實用技巧 >python 多工與資料傳輸

python 多工與資料傳輸

多工

概念:作業系統“同時”執行多個任務

日常中多工無處不在,比如唱歌跳舞,一切皆物件,python如何對這些行為之間的關係進行建模,這個工具就是多工。

重要概念

併發與並行

併發:指的是任務數多於cpu核數,通過作業系統的各種任務排程演算法,實現用多個任務“一起”執行,存在著多個任務共用一個核,因為任務切換很快,所以沒有感覺,像是在一起執行

並行:指的是任務數小於等於cpu核數,即任務真的是一起執行的

多工編碼

(1)執行緒

概念:執行緒是程序中的一個執行任務(控制單元),cpu排程的最小單元

執行緒版唱歌跳舞

import time
import
threading def sing():#自定義方法,使用target引數 for i in range(5): print("在唱歌~~%d" % i) time.sleep(1) def dance(): for i in range(5): print("也在跳舞~~%d" % i) time.sleep(1) def main(): t1 = threading.Thread(target=sing) t2 = threading.Thread(target=dance) t1.start() t2.start()
if __name__ == "__main__": main() import threading import time class SingThread(threading.Thread):#自定義繼承類,繼承Thread def run(self): # 必須有run方法 for i in range(5): time.sleep(1) msg = "在唱歌" + str(i) # name表示當前執行緒的名字,str()把i轉成string型別才能拼接 print
(msg) class DanceThread(threading.Thread): def run(self): # 必須有run方法 for i in range(5): time.sleep(1) msg = "在跳舞" + str(i) # name表示當前執行緒的名字,str()把i轉成string型別才能拼接 print(msg) def main(): t1 = SingThread() t2 = DanceThread() t1.start() t2.start() if __name__ == '__main__': main()
#
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)


執行緒資料

通過grobal引數在一個程序內的所有執行緒共享全域性變數,很方便在多個執行緒間共享資料

缺點就是,執行緒是對全域性變數隨意遂改可能造成多執行緒之間對全域性變數的混亂(即執行緒非安全),解決執行緒非安全的問題要使用鎖

#不加鎖前,全域性變數的計算會出錯
import threading
import time

g_num = 0

def test1(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in test1 g_num = %d ----" % g_num)

def test2(num):
    global g_num
    for i in range(num):
        g_num += 1
    print("----in test2 g_num = %d ----" % g_num)

def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))
    t1.start()
    t2.start()
    # 等待兩個執行緒執行完畢
    time.sleep(5)
    print("----in main g_num = %d ----" % g_num)

if __name__ == '__main__':
    main()

如何使用鎖

# 建立鎖
mutex = threading.Lock()# 或者使用Rlock(),Rlock可以在一個執行緒中使用多個鎖定和釋放,就近原則

# 鎖定
mutex.acquire()

# 釋放
mutex.release()

#加鎖後的全域性變數計算正確
import threading
import time

g_num = 0

def test1(num):
    global g_num
    # 上鎖,如果之前沒有被上鎖,此時上鎖成功
    # 如果上鎖之前已經上過鎖,那麼此時堵塞,直到這個鎖被解開
    for i in range(num):
        mutex.acquire()
        g_num += 1
        mutex.release()
    print("----in test1 g_num = %d ----" % g_num)

def test2(num):
    global g_num
    for i in range(num):
        mutex.acquire()
        g_num += 1
        mutex.release()
    print("----in test2 g_num = %d ----" % g_num)


# 建立一個互斥鎖,預設是沒有上鎖的
mutex = threading.Lock()

def main():
    t1 = threading.Thread(target=test1, args=(1000000,))
    t2 = threading.Thread(target=test2, args=(1000000,))
    t1.start()
    t2.start()
    time.sleep(3)
    print("----in main g_num = %d ----" % g_num)

if __name__ == '__main__':
    main()

死鎖

原因:兩個執行緒分別佔有一部分資源並且同時等待對方的資源,就會造成死鎖,在無外力作用下會無限期等待下去。

程序

(1)重要概念

概念:一個程式執行起來後,程式碼+用到的資源 稱之為程序,程序是系統排程和資源分配的一個獨立最小單元

程序的生理週期:

就緒態:執行的條件都已經滿足,正在等在cpu執行

執行態:cpu正在執行其功能

等待態:等待某些條件滿足,例如一個程式sleep了,此時就處於等待態

(2)程序的建立

import multiprocessing
import time

def test1():
    for i in range(5):
        print("t1:%d" % i)
        time.sleep(1)

def test2():
    for i in range(5):
        print("t2:%d" % i)
        time.sleep(1)

def main():
    p1 = multiprocessing.Process(target=test1)
    p2 = multiprocessing.Process(target=test2)
    p1.start()
    p2.start()

if __name__ == '__main__':
    main()
#Process(group=None, target=None, name=None, args=(), kwargs={})

程序常見方法

  • start():啟動子程序例項(建立子程序)
  • is_alive():判斷程序子程序是否還在活著
  • join([timeout]):是否等待子程序執行結束,或等待多少秒
  • terminate():不管任務是否完成,立即終止子程序

程序通訊

程序要通訊要通過第三方資訊佇列程式,可以使用multiprocessing模組的Queue,程序的args引數傳遞的是Queue的物件

import multiprocessing
import time

def sendpro(q):
    data = [11, 22, 33]
    for temp in data:
        q.put(temp)
        time.sleep(1)
    print("資料存入queue佇列中")

def getpro(q):
    while True:
        temp = q.get()
        print(temp)
        if q.empty():
            break

def main():
    # 建立一個queue佇列
    q = multiprocessing.Queue(3)

    p1 = multiprocessing.Process(target=sendpro, args=(q, ))
    p2 = multiprocessing.Process(target=getpro, args=(q, ))
    p1.start()
    p1.join()  # 等待子程序結束
    p2.start()

if __name__ == '__main__':
    main()

程序池

常見方法:

建立程序池:程序池名 = multiprossing.Pool(最大程序數),提示:使用程序池建立的程序是守護主程序的狀態,預設自己通過Process建立的程序是:不守護主程序的狀態

選擇同步或非同步執行任務

同步(一個任務執行完後,另一個任務才執行):程序池名.apply(函式名)

非同步(任務執行不會等待,多個任務一起執行):程序池名.apply_async(函式名)

from multiprocessing import Pool
import os
import time
import random


def worker(msg):
    t_start = time.time()
    print("%s開始執行,程序號為%d" % (msg,os.getpid()))
    time.sleep(random.random())
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個程序池,最大程序數3
for i in range(0,10):
    # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
    # 每次迴圈將會用空閒出來的子程序去呼叫目標
    po.apply_async(worker,(i,))#確定程序池內多個任務非同步

print("----start----")
po.close()  # 關閉程序池,關閉後po不再接收新的請求
po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")

協程

概念:多個協程線上程內部執行,共享執行緒的資源 ,是一個比執行緒更小的執行單元,能實現多工

協程在一個執行緒內,一定是併發的

利用yield做協程

寫多個函式,每個函式中都寫yield,函式執行時遇到yield就會阻塞,然後交替著呼叫不同任務的next()方法,這樣就用協程實現了多工

import time

def func1():
    while True:
        print("-----1-----")
        time.sleep(0.1)
        yield

def func2():
    while True:
        print("-----2-----")
        time.sleep(0.1)
        yield

def main():
    t1 = func1()
    t2 = func2()
    while True:
        next(t1)
        next(t2)

if __name__ == '__main__':
    main()

利用greenlet(pip)做協程,手寫程式碼切換

import time
from greenlet import greenlet

def test1():
    while True:
        print("----1----")
        gl2.switch()
        time.sleep(0.1)

def test2():
    while True:
        print("----2----")
        gl1.switch()
        time.sleep(0.1)

gl1 = greenlet(test1)
gl2 = greenlet(test2)

gl1.switch()#切換到協程所在函式或啟動


利用gevent(pip)做協程,io時自動切換

gevent的原理是當一個greenlet遇到 IO(指的是input output 輸入輸出,比如網路、檔案操作等)操作時,比如訪問網路,就自動切換到其他的greenlet,等到 IO 操作完成,再在適當的時候切換回來繼續執行。

import gevent
import time
from gevent import monkey

monkey.patch_all()#使用mokey打補丁

def f(n):
    for i in range(n):
        print(gevent.getcurrent(), i)
        time.sleep(0.5)
#join的一次性載入版joinall
#
spawn(fuc,args,返回一個協程)
gevent.joinall([ 
gevent.spawn(f,
5),
gevent.spawn(f,
5),
gevent.spawn(f,
5),
])