1. 程式人生 > >Python並行處理——多執行緒

Python並行處理——多執行緒

Parallel Processing

並行處理利用多個CPU在同一時間處理多條指令,這是個很有用的技術,但是它帶來的問題也是很多的,比如多個處理器怎麼共享資料?這個會造成很多讀寫衝突,因此需要學會管理這些因素,寫出更強大的程式碼來進行快速並有意義的資料分析。

Mutability

  • 在python中,有些變數像intergers,是不可變的。這意味著他們的值是不可變的。
def add1(x):
    x = x + 1
x = 5
add1(x)
print(x)
'''
5
'''
  • 但是像字典和列表是可變的,可以代表一些會改變的資訊,並行處理中可變變數是很有用的,因為我們並行處理中會共享變數,而且幾個處理器會編輯同一個變數。下面建立一個Counter類,看看變數是如何變化的:
class Counter():
    def __init__(self):
        self.count = 0
    def increment(self):
        self.count += 1
    def get_count(self):
        return self.count

def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

counter = Counter()
initial_count = counter.get_count()
count_up_100000(counter)
final_count = counter.get_count()
print(final_count)
'''
100000
'''

Multithreading

上面那個例子中發現counter這個例項的count值已經變為100000了,我們發現在count_up_100000這個函式中,一直在呼叫Counter的一個方法,對counter的count值進行+1,這個操作實際上可以並行處理,假設我們有100000個CPU,那麼我們一次就可以執行所有迴圈,這就是多執行緒(multithreading)。

  • 一個執行緒(thread)就是一條執行路徑。在Python中我們可以利用threading.Thread()來建立新的執行緒,作為一個獨立的過程來執行一個給定的函式,比如上一個例子中,我們建立新的執行緒來執行count_up_100000這個函式,啟動一個執行緒的方法是thread.start()。當這個執行緒結束執行後才會執行thread.join(),將其join到主執行緒中。
import threading
counter = Counter()
count_thread = threading.Thread(target=count_up_100000, args=[counter])
count_thread.start()
count_thread.join()
after_join = counter.get_count()
print(after_join)
'''
100000
'''

Determinism

  • 大部分單執行緒操作的結果是確定的(deterministic ),因為我們是一步一步執行程式,結果是可以預測的。現在考慮一個情況,你讓你的一個朋友來數數,數到100000,你是主執行緒,建立了這樣一個數數執行緒,然後當它數完就會告訴你,這類似與join主執行緒。但是在他還沒有數完的過程中,count的結果是不確定的(nondeterministic)。
import threading
def conduct_trial():
    counter = Counter()
    count_thread = threading.Thread(target=count_up_100000, args=[counter])
    count_thread.start()
    intermediate_value = counter.get_count()
    count_thread.join()
    return intermediate_value

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
24145
24762
24881
'''
  • 在還未join時,也就是執行緒還未結束時,此時的count的值是不確定的。

Enforcing Determinism

  • 互斥鎖同步
    執行緒同步能夠保證多個執行緒安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個執行緒要更改共享資料時,先將其鎖定,此時資源的狀態為“鎖定”,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成“非鎖定”,其他的執行緒才能再次鎖定該資源。互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。其中,鎖定方法acquire可以有一個超時時間的可選引數timeout。如果設定了timeout,則在超時後通過返回值可以判斷是否得到了鎖,從而可以進行一些其他的處理。
  • 同步阻塞
    當一個執行緒呼叫鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。每次只有一個執行緒可以獲得鎖。如果此時另一個執行緒試圖獲得這個鎖,該執行緒就會變為“blocked”狀態,稱為“同步阻塞”。直到擁有鎖的執行緒呼叫鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。執行緒排程程式從處於同步阻塞狀態的執行緒中選擇一個來獲得鎖,並使得該執行緒進入執行(running)狀態。
import threading
def count_up_100000(counter, lock):
    for i in range(10000):
        lock.acquire()
        for i in range(10):
            counter.increment()
        lock.release()

def conduct_trial():
    counter = Counter()
    lock = threading.Lock()
    count_thread = threading.Thread(target=count_up_100000, args=[counter, lock])
    count_thread.start()
    lock.acquire()
    intermediate_value = counter.get_count()
    lock.release()
    count_thread.join()
    return intermediate_value

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
20940
21100
21030
'''
  • 上面這段程式碼將+10鎖起來,也就是獲取的值都是10的倍數,只有+10了這個執行緒才可能釋放,因此無論何時訪問count,都是10的倍數。

Counting Twice

def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

counter = Counter()
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

counter = Counter()
count_up_100000(counter)
count_up_100000(counter)
final_count = counter.get_count()
print(final_count)
'''
200000
'''

Splitting Our Count Into Two Threads

  • 為了完成上面那個功能,我們其實可以引入兩個執行緒,每個計數100000,然後和就是200000.
def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

def conduct_trial():
    counter = Counter()
    count_thread1 = threading.Thread(target=count_up_100000, args=[counter])
    count_thread2 = threading.Thread(target=count_up_100000, args=[counter])

    count_thread1.start()
    count_thread2.start()
    count_thread1.join()
    count_thread2.join()
    final_count = counter.get_count()
    return final_count

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
200000
200000
134250
'''

Atomicity

  • 在上一個操作中我們發現結果是不確定的,counter.increment()函式內部self.count += 1實際包含兩行程式碼: old_count = self.count以及self.count = old_count + 1,因此當一個執行緒執行這個函式的時候,可能只是執行到第一行,就被第二個執行緒呼叫counter.increment()函式,而此時兩個執行緒同時+1,導致這兩個+1操作發生在同一個count上,因此兩次+1變成+1不是+2.因此雖然呼叫了counter.increment()200000次,但是最終的結果可能並不是+200000.
    因此需要採用原子操作(atomic operation)。將這個counter.increment()設定為原子操作,其它執行緒必須等原子操作執行完畢可能執行。A對count+1後B才對其+1,而不會出現前面count=10002時,兩個同時對其+1,結果是10003。
import threading
class Counter():
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
    def increment(self):
        self.lock.acquire()
        old_count = self.count
        self.count = old_count + 1
        self.lock.release()
    def get_count(self):
        return self.count

def count_up_100000(counter):
    for i in range(100000):
        counter.increment()

def conduct_trial():
    counter = Counter()
    count_thread1 = threading.Thread(target=count_up_100000, args=[counter])
    count_thread2 = threading.Thread(target=count_up_100000, args=[counter])

    count_thread1.start()
    count_thread2.start()

    count_thread1.join()
    count_thread2.join()

    final_count = counter.get_count()
    return final_count

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
200000
200000
200000
'''