1. 程式人生 > 其它 >python 基礎知識梳理——Python中的多程序和多執行緒

python 基礎知識梳理——Python中的多程序和多執行緒

技術標籤:Python學習python多執行緒多程序資料結構

python 基礎知識梳理——Python中的多程序和多執行緒

1. 引言

  • 程序是作業系統資源分配的基本單位,而執行緒是任務排程的基本單位。
  • 一個應用程式至少包括一個程序,而一個程序至少包括一個執行緒,執行緒的尺度更小
  • 每個程序在執行過程中都擁有獨立的記憶體單元,而每個程序的多個子執行緒共享同一個記憶體單元

2. 多程序

2.1 Python中的多程序與multiprocess模組

Python中的多程序程式設計主要依靠的是multiprocess模組,現在我們對比兩個任務,一個採用多程序,另一個不採用多程序,輸出執行時間。

import time
import os

def use_time_task():
    print('當前[子]程序為:{}'.format(os.getpid()))
    time.sleep(2)
    print('結果:{}'.format(10 ** 24))

if __name__ == '__main__':
    print('當前[母]程序為:{}'.format(os.getpid()))
    start = time.perf_counter()
    for i in range(2):
        use_time_task()
    end =
time.perf_counter() print('耗時為 {} 秒'.format(end-start)) # 輸出 當前[]程序為:37016 當前[]程序為:37016 結果:1000000000000000000000000 當前[]程序為:37016 結果:1000000000000000000000000 耗時為 4.007984525

可以看出,計算時間為4秒,子程序和母程序相同,此時只有一個程序參與計算。

下面我們採用2個程序計算該任務。

import time
import os
import multiprocessing

def use_time_task(i):
    print('當前[子]程序為:{}--任務{}'.format(os.getpid(
),i)) time.sleep(2) print('結果:{}'.format(10 ** 24)) if __name__ == '__main__': print('當前[母]程序為:{}'.format(os.getpid())) start = time.perf_counter() task1 = multiprocessing.Process(target=use_time_task,args=(1,)) task2 = multiprocessing.Process(target=use_time_task,args=(2,)) print('所有子程序完成') task1.start() task2.start() task1.join() task2.join() end = time.perf_counter() print('耗時為 {} 秒'.format(end-start)) # 輸出 當前[]程序為:37055 所有子程序完成 當前[]程序為:37056--任務1 當前[]程序為:37057--任務2 結果:1000000000000000000000000 結果:1000000000000000000000000 耗時為 2.0143263890000003

可以看到,多程序的耗時僅為單個程序的一半,併發執行的時間確實要少很多。雖然我們就建立了一個程序,但是1個母程序中卻包含2個子程序,我們在程式碼中採用了join()就是為了讓母程序阻塞,等所有的子程序完成後才打印總耗時。

2.1.2 總結

  1. 新的程序建立和切換都是要消耗資源的,且程序數受制於CPU的核心數,一般不能太大
  2. 程序之間的記憶體空間是獨立的,不方便互相通訊(其實我們可以使用Queue([maxsize])命令還構建程序通訊)
  3. 除了Process方法外,我們還可以通過Pool類建立多程序

那麼,我們來看看如何用Pool類如何建立多執行緒吧。

2.2 利用multiprocess模組的Pool類建立多程序

很多時候,系統需要建立多個程序以提高CPU的利用率,當數量少時,我們可以手動建立,但是程序數量很多的時候,執行緒池Pool就發揮作用了,我們可以傳遞引數限制併發的數量,預設值為CPU的核心數。

Pool類會提供指定數量的程序供使用者呼叫,當新的請求提交到Pool中時,如果程序池沒有滿,就會建立一個新的程序來執行請求,如果池滿就會告知先等待,直到執行緒池中有程序結束,才會建立新的程序來執行這些請求。

2.2.1 multiprocess.pool的幾個方法:

1.apply.async

函式原型:apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None):

作用:向程序池提交需要執行的函式及引數,各個程序採用非阻塞(非同步)的呼叫方式,每個程序制只管執行自己的,不管其他程序是否完成(預設方式)

2.map()

函式原型:map(self, func, iterable, chunksize=None)

作用:Pool類的map方法,與內建的map函式用法基本一致,它會使得程序阻塞(同步)知道返回結果。

注意:雖然第二個引數是迭代器,但在實際應用中,必須在整個佇列就緒後,程式才會返回執行的子程序。

3.map_async()

函式原型:map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):

作用:與map()函式作用一致,但是它是非阻塞(非同步)的。

4.close()

作用:關閉執行緒池,不再接受新的任務。

5.terminate()

作用:結束工作程序,不再處理未處理的任務。

6.join()

作用:主程序阻塞等待子程序的退出,join()方法要在close()terminate()之後使用

2.2.2舉例

from multiprocessing import Pool
import os
import time

def use_time_task(i):
    print('當前[子]程序為:{}--任務{}'.format(os.getpid(),i))
    time.sleep(2)
    print('結果:{}\n'.format(10 ** 24))

if __name__ == '__main__':
    print('cpu核心數:{}'.format(os.cpu_count()))
    print('當前[母]程序為:{}'.format(os.getpid()))
    start = time.perf_counter()
    p = Pool(4)
    for i in range(5):
        p.apply_async(use_time_task,args=(i,))
    print('等待所有子程序完成')
    p.close()
    p.join()# 主程序阻塞等待子程序退出
    end = time.perf_counter()
    print('耗時為 {} 秒'.format(end-start))
# 輸出
cpu核心數:4
當前[]程序為:37299
等待所有子程序完成
當前[]程序為:37300--任務0
當前[]程序為:37301--任務1
當前[]程序為:37302--任務2
當前[]程序為:37303--任務3
結果:1000000000000000000000000
結果:1000000000000000000000000
結果:1000000000000000000000000
結果:1000000000000000000000000
當前[]程序為:37302--任務4
結果:1000000000000000000000000
耗時為 4.158054153
2.2.3 總結

Pool物件呼叫join()方法會等待所有程序執行完畢,呼叫join()之前必須先呼叫close()terminate()方法,讓它不再接受其他Process。

由於我的MacBook Pro13 僅有4核心,所以一次只能呼叫一個容量為4的程序池,4個程序計算5次,必須等待所有程序計算結束才能分配剩下的一次任務給一個程序。

在Python中,由於有GIL(全域性直譯器鎖)的存在,它的作用是保證同一時間只有一個執行緒可以執行程式碼,很多人,包括我之前也認為Python中的多執行緒其實不是真正的多執行緒,如果想要充分的利用多核CPU仍然需要使用多程序,其餘細節我會在部落格的後續中續繼續討論。

2.3 多程序之間的資料共享

通常來說,程序之間是相互獨立的,每個程序都有獨立的記憶體。多程序共享必然會導致程序間的相互競爭,所以要盡最大可能防止使用共享狀態。

還有一種方法是使用佇列queue來實現程序之間的通訊和資料共享,下面這個例子中,我們建立2個程序,一個負責寫,一個負責讀,實現共享一個佇列queue。

from multiprocessing import Process,Queue
import os,time,random

# 寫程序資料
def write_process(q):
    print('程序{}正在寫入'.format(os.getpid()))
    for value in ['A','B','C']:
        print('把[{}]放入佇列'.format(value))
        q.put(value)
        time.sleep(random.random())

# 讀程序資料
def read_process(q):
    print('程序{}正在讀取'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('在佇列中得到程序[{}]'.format(value))

if __name__ == '__main__':
    # 父程序建立Queue,並傳遞給各個子程序
    q = Queue()
    pw = Process(target=write_process,args=(q,))
    pr = Process(target=read_process,args=(q,))
    # 啟動子程序pw
    pw.start()
    # 啟動子程序or
    pr.start()
    # 等待pw結束
    pw.join()
    # pr裡是死迴圈,直接強行終止
    pr.terminate()

# 輸出
程序37664正在寫入
把[A]放入佇列
程序37665正在讀取
在佇列中得到程序[A][B]放入佇列
在佇列中得到程序[B][C]放入佇列
在佇列中得到程序[C]

3. 多執行緒

Python中的多執行緒其實是在同一時刻執行多個不同程式,多執行緒的優點很多:

  1. 使用執行緒可以把佔時程式的任務放到後臺去處理
  2. 程式的執行速度加快
  3. 在一些等待服務,例如:使用者註冊、簡訊驗證碼和網路收發資料等等,使用執行緒可以明顯的改善記憶體佔用的問題

每個獨立的執行緒都有一個程式入口、順序執行序列和程式的出入口,但是執行緒不能夠獨立的執行,必須依賴於應用程式,由應用程式提供的多個執行緒執行控制。

3.1 Python中的多執行緒與threading模組

Python3中的多執行緒一般使用threading模組,在Python2中使用的是thread模組,該模組在Python3中已經被棄用,為了相容性,Python3中把thread模組重新命名為_thread

threading.Thread模組可以接收兩個引數,一個是target,一般指向函式名,另一個是args,用來傳入函式需要的引數。呼叫start()方法啟動多執行緒,還可以使用current_thread().name列印當前執行緒的名字,下面我們採用之前多程序中採用的例子,把它用執行緒來實現。

import threading
import time

def use_time_task(i):
    print('當前[子]執行緒為:{}--任務{}'.format(threading.current_thread().name,i))
    time.sleep(2)
    print('結果:{}\n'.format(10 ** 24))

if __name__ == '__main__':
    start = time.perf_counter()
    print('當前[主]執行緒為:{}'.format(threading.current_thread().name))
    t1 = threading.Thread(target=use_time_task,args=(1,))
    t2 = threading.Thread(target=use_time_task,args=(2,))
    t1.start()
    t2.start()
    end = time.perf_counter()
    print('總耗時{}秒'.format(end-start))
# 輸出
當前[]執行緒為:MainThread
當前[]執行緒為:Thread-1--任務1
當前[]執行緒為:Thread-2--任務2
總耗時0.0028649530000000034秒
結果:1000000000000000000000000
結果:1000000000000000000000000

很明顯,發生了和多程序中一樣的問題,總耗時並沒有真正計算到子執行緒運算的時間,直接把主執行緒運算時間給輸出了,我們必須要使用join()方法,程式碼如下:

import threading
import time

def use_time_task(i):
    print('當前[子]執行緒為:{}--任務{}'.format(threading.current_thread().name,i))
    time.sleep(2)
    print('結果:{}\n'.format(10 ** 24))

if __name__ == '__main__':
    start = time.perf_counter()
    print('當前[主]執行緒為:{}'.format(threading.current_thread().name))
    t1 = threading.Thread(target=use_time_task,args=(1,))
    t2 = threading.Thread(target=use_time_task,args=(2,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()# 等待t2完成後再計算時間
    end = time.perf_counter()
    print('總耗時{}秒'.format(end-start))
# 輸出
當前[]執行緒為:MainThread
當前[]執行緒為:Thread-1--任務1
當前[]執行緒為:Thread-2--任務2
結果:1000000000000000000000000
結果:1000000000000000000000000
總耗時2.00421429

我們再讓程式碼簡化一些,不要手動新增join()

import threading
import time

def use_time_task(i):
    print('當前[子]執行緒為:{}--任務{}\n'.format(threading.current_thread().name,i))
    time.sleep(2)
    print('結果:{}\n'.format(10 ** 24))

if __name__ == '__main__':
    start = time.perf_counter()
    print('當前[主]執行緒為:{}\n'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1,3):
        t = threading.Thread(target=use_time_task,args=(i,))
        thread_list.append(t)
    print('thread_list中含有的執行緒為:{}'.format(thread_list))

    for t in thread_list:
        t.start()

    for t in thread_list:
        t.join()

    end = time.perf_counter()
    print('總耗時{}秒'.format(end-start))
# 輸出
當前[]執行緒為:MainThread
thread_list中含有的執行緒為:[<Thread(Thread-1, initial)>, <Thread(Thread-2, initial)>]
當前[]執行緒為:Thread-1--任務1
當前[]執行緒為:Thread-2--任務2
結果:1000000000000000000000000
結果:1000000000000000000000000
總耗時2.005684626

當我們設定多執行緒時,主執行緒會建立多個子執行緒,如果希望主執行緒等待子執行緒實現執行緒同步,那麼我們就需要使用join()方法,如果我們希望一個主執行緒結束後就不再執行子執行緒,那麼我們就需要使用setDaemon(True)(守護程序)來實現。

import threading
import time

def use_time_task(i):
    print('當前[子]執行緒為:{}--任務{}\n'.format(threading.current_thread().name,i))
    time.sleep(2)
    print('結果:{}\n'.format(10 ** 24))

if __name__ == '__main__':
    start = time.perf_counter()
    print('當前[主]執行緒為:{}\n'.format(threading.current_thread().name))
    for i in range(5):
        t = threading.Thread(target=use_time_task,args=(i,))
        t.setDaemon(True)
        t.start()
    end = time.perf_counter()
    print('總耗時{}秒'.format(end-start))
# 輸出
當前[]執行緒為:MainThread
當前[]執行緒為:Thread-1--任務0
當前[]執行緒為:Thread-2--任務1
當前[]執行緒為:Thread-3--任務2
當前[]執行緒為:Thread-4--任務3
當前[]執行緒為:Thread-5--任務4
總耗時0.0010438910000000051

3.2 繼承Thread類來重寫run方法建立新程序

除了使用threading.Thread()方法來建立新的執行緒外,還可以通過繼承Thread類重寫run方法來建立新的執行緒。

import threading
import time

def use_time_task(i):
    time.sleep(2)
    return 10 ** 24

class MyThread(threading.Thread):
    def __init__(self, func, args, name=' '):
        threading.Thread.__init__(self)
        self.func = func
        self.args = args
        self.name = name
        self.result = None

    def run(self):
        print('開始[子]程序{}\n'.format(self.name))
        self.result = self.func(self.args[0],)
        print('結果為:{}\n'.format(self.result))
        print('{}子程序結束'.format(self.name))

if __name__ == '__main__':
    start = time.time()
    threads = []
    for i in range(1,3):
        t = MyThread(use_time_task,args=(i,),name=str(i))
        threads.append(t)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    end = time.time()
    print('總耗時{}秒'.format(end-start))
# 輸出
開始[]程序1
開始[]程序2
結果為:1000000000000000000000000
1子程序結束
結果為:1000000000000000000000000
2子程序結束
總耗時2.0061709880828857

通過繼承threading.Thread類,我們定義了一個新的MyThread().run()方法,通過該類的例項化建立了2個子執行緒。

3.3 不同執行緒之間的資料共享

之前我們提到,執行緒之間共享同一部分記憶體空間,那麼意味著任何一個執行緒都可以改變摸一個變數,因此執行緒之間的共享資料最大危險在於多個執行緒同時修改一個變數。針對這種情況,我們可以使用threading.lock()方法對一個共享變數進行鎖定,修改完後供其他執行緒使用。

import threading
import time

class Account:
    def __init__(self):
        self.balance = 0
    def add(self,lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0,100000):
            self.balance += 1
        # 釋放鎖
        lock.release()
    def delete(self,lock):
        # 獲得鎖
        lock.acquire()
        for i in range(0,100000):
            self.balance -= 1
        # 釋放鎖
        lock.release()

if __name__ == '__main__':
    account = Account()
    lock = threading.Lock()
    # 建立執行緒
    thread_add = threading.Thread(target=account.add,args=(lock,),name='Add')
    thread_delete = threading.Thread(target=account.delete,args=(lock,),name='Delete')

    # 啟動執行緒
    thread_add.start()
    thread_delete.start()
    # 執行緒等待
    thread_add.join()
    thread_delete.join()

    print('最終的balance為:{}'.format(account.balance))
# 輸出
最終的balance為:0

顯然,新增的執行緒和刪除的執行緒相互抵消,最終的balance為0。

3.4 使用queue佇列通訊

下面的例子中,建立了2個執行緒,一個用於生成,另一個用於消費,生成的產品放在queue裡,實現了不通的執行緒溝通。

from queue import Queue
import random,threading,time

# 生產者類
class Producer(threading.Thread):
    def __init__(self, name ,queue):
        threading.Thread.__init__(self,name=name)
        self.queue = queue

    def run(self):
        for i in range(1,5):
            print('{} is producing {} to queue!'.format(self.getName(),i))
            self.queue.put(i)
            time.sleep(random.randrange(10)/5)
        print('{} finished! '.format(self.getName()))

# 消費者類
class Customer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self,name=name)
        self.queue = queue

    def run(self):
        for i in range(1,5):
            val = self.queue.get()
            print('{} is consuming {} in queue'.format(self.getName(),val))
            time.sleep(random.randrange(10))
        print('{} finished !'.format(self.getName()))

def main():
    queue = Queue()
    producer = Producer('Producer',queue)
    customer = Customer('customer',queue)

    producer.start()
    customer.start()

    producer.join()
    customer.join()

    print('全部執行緒 完成!')

if __name__ == '__main__':
    main()
# 輸出
Producer is producing 1 to queue!
customer is consuming 1 in queue
Producer is producing 2 to queue!
Producer is producing 3 to queue!
Producer is producing 4 to queue!
Producer finished! 
customer is consuming 2 in queue
customer is consuming 3 in queue
customer is consuming 4 in queue
customer finished !
全部執行緒 完成!

上面的例子中,佇列queueput方法可以將一個物件obj放入佇列中,如果佇列滿了,則將阻塞直到佇列有空間可用為止,queueget方法則會一次返回佇列中的一個成員,如果佇列為空,則將阻塞直到佇列有種成員為止。

同時,queue還有empty()full()方法,用來檢視佇列的空、滿狀態。

4.總結

  • 對於CPU密集型程式碼——多程序效率更高
  • 對於IO密集型程式碼(檔案操作、爬蟲等)——多執行緒效率高

對於IO密集型應用,大部分時間消耗是用於等待,在檔案或爬蟲中,即使是多CPU也不能完全被利用起來,多程序效率必然不高。

當Python遇到IO密集型程式碼時,會釋放GIL供新的執行緒使用,實現了執行緒之間的切換。






博文的後續更新,請關注我的個人部落格:星塵部落格