1. 程式人生 > 實用技巧 >Python執行緒/程序

Python執行緒/程序

GIL

GIL:global interpreter lock全域性鎖
限制:使得python中一個執行緒對應於C語言中的一個執行緒。同一時刻只有一個執行緒執行在一個CPU上執行位元組碼,無法將多執行緒執行到多CPU上。
釋放:根據位元組碼執行行數或者時間片來釋放鎖,或者遇到io操作時會主動釋放。
顯示位元組碼:from dis import dis

區別

  • CPU操作(例如運算)採用多程序更加具有優勢(GIL使得多執行緒沒有辦法在不同的CPU上執行)
  • IO操作採用多執行緒更具有優勢(當碰見IO操作時,會主動釋放GIL)

ThreadLocal

使用場景:多執行緒環境下,如果使用全域性變數,需要加鎖。但是如果每個執行緒都需要擁有自己的私有資料時,即這個資料對其他執行緒不可見。可以使用ThreadLocal

變數,它本身是一個全域性變數,但是每個執行緒卻可以利用它來儲存屬於自己的私有資料。

import threading

global_data = threading.local()

def show():
    print threading.current_thread().getName(), global_data.num

def thread_cal():
    global_data.num = 0
    for _ in xrange(1000):
        global_data.num += 1
    show()

threads = []
for i in range(10):
    threads.append(threading.Thread(target=thread_cal))
    threads[i].start()

join()setDaemon()

join:主執行緒啟動若干個子執行緒後,可以繼續執行主執行緒的程式碼,也可以等待所有子執行緒執行完畢後繼續執行主執行緒。如果子執行緒呼叫了join方法,則表明主執行緒必須等子執行緒執行完了才可以往下繼續執行。因此,如果需要所有的子執行緒都能再主執行緒結束前被執行完畢,則必須為每一個子執行緒都註冊join
setDaemon():設定此執行緒是否被主執行緒守護回收。需要在start方法前呼叫,設為True時,當主執行緒結束之後,會回收此子執行緒。預設是False,即主執行緒結束時不會回收子執行緒。

建立執行緒

Threading模組

# -*- coding: utf-8 -*-
import time
import threading 

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(2)
    print("get detail url end")


if __name__ == "__main__":
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))
    start_time = time.time()
    thread1.start()
    thread2.start()
    print("last time: {}".format(time.time() - start_time))

派生Thread的子類,並建立例項

需要過載run方法

class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(2)
        print("get detail url end")



if __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()
    print("last time: {}".format(time.time() - start_time))

Queue

使用佇列,可以進行阻塞操作。它是基於deque開發的,因而是執行緒安全的。
初始化:

import Queue
msg_deque = Queue.Queue()

放入佇列:

msg_deque.put(data)

取出佇列,沒有時間預設阻塞等待:

msg_deque.get(block=True)

msg_deque.task_done():每次從queueget一個數據之後,當處理好相關問題,最後呼叫該方法,以提示msg_deque.join()是否停止阻塞。讓執行緒向前執行或者退出:

import queue

q = queue.Queue(2)
q.put(15)
q.put(59)

q.get()
q.task_done()  # get取完佇列中的一個值後,使用task_done方法告訴佇列,我已經取出了一個值並處理完畢
q.get()
q.task_done()

q.join()

如果沒有task_done,那麼會一直阻塞在join

佇列型別:

  • 先進先出佇列:queue.Queue
  • 後進先出佇列:queue.LifoQueue
  • 優先順序佇列:queue.PriorityQueue
  • 雙向佇列:queue.deque

原始碼分析:

def put(self, item, block=True, timeout=None):
   # self.not_full是一個條件變數,如果size滿足的話,直接進行put操作(append),並且釋放鎖給get
   # 否則等待not_full(由get中的notify觸發)
    with self.not_full:
        if self.maxsize > 0:
            if not block:
                if self._qsize() >= self.maxsize:
                    raise Full
            elif timeout is None:
                while self._qsize() >= self.maxsize:
                    self.not_full.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while self._qsize() >= self.maxsize:
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()

get同理,如果size為空,則等待not_empty,否則通知put可進行操作:

def get(self, block=True, timeout=None):
    with self.not_empty:
        if not block:
            if not self._qsize():
                raise Empty
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        elif timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        else:
            endtime = time() + timeout
            while not self._qsize():
                remaining = endtime - time()
                if remaining <= 0.0:
                    raise Empty
                self.not_empty.wait(remaining)
        item = self._get()
        self.not_full.notify()
        return item

同步機制

除了Lock之外,還存在其他的鎖

RLock:可重入鎖,允許在同一執行緒中被呼叫多次,但是acquirerelease的數量應當相等。
Condition:條件同步機制,一個執行緒等待特定條件,另一個執行緒發出特定條件滿足的訊號。具有waitnotify兩種方法。Condition實際上有兩把鎖,當使用wait方法時,會釋放最外層的鎖,並分配一把鎖,加至__waiters__這個佇列裡,只有當呼叫notify方法時,才會從__waiters方法中釋放鎖。
Semaphore:通過計數器限制可同時執行的執行緒數量。acquire()遞減計數器,release()則是增加計數器。

# -*- coding: utf-8 -*-
import threading

class Client(threading.Thread):
    def __init__(self, cond):
        super().__init__(name='client')
        self.cond = cond
    def run(self):
        with self.cond:
            self.cond.wait()
            print("client saying")
            self.cond.notify()


class Server(threading.Thread):
    def __init__(self, cond):
        super().__init__(name='server')
        self.cond = cond
    def run(self):
        with self.cond:
            print("server saying")
            self.cond.notify()
            self.cond.wait()


if __name__ == "__main__":
    cond = threading.Condition()
    server = Server(cond)
    client = Client(cond)
    client.start()
    server.start()

condition原始碼分析:

def wait(self, timeout=None):
    if not self._is_owned():
       raise RuntimeError("cannot wait on un-acquired lock")
   # 獲得一個內部鎖
   waiter = _allocate_lock()
   waiter.acquire()
   self._waiters.append(waiter)
   # 釋放condition鎖
   saved_state = self._release_save()
   gotit = False
   try:    # restore state no matter what (e.g., KeyboardInterrupt)
       if timeout is None:
           # 等待notify釋放鎖才能再次進入
           waiter.acquire()
           gotit = True
       else:
           if timeout > 0:
               gotit = waiter.acquire(True, timeout)
           else:
               gotit = waiter.acquire(False)
       return gotit
   finally:
      # 恢復condition鎖
       self._acquire_restore(saved_state)
       if not gotit:
           try:
               self._waiters.remove(waiter)
           except ValueError:
               pass

Semaphore:主要用於控制獲取鎖的數量

# -*- coding: utf-8 -*-
import threading


class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        import time
        time.sleep(2)
        print('success!')
        self.sem.release()

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider('http://www.baidu.com', self.sem)
            html_thread.start()


if __name__ == '__main__':
    sem = threading.Semaphore(3)
    url_procuder = UrlProducer(sem)
    url_procuder.start()

Event:一個執行緒通知事件,其他執行緒等待事件。內建了一個初始為False的標誌,當呼叫set()時為True,呼叫clear()時重置為False。包括一下方法:

  • isSet():當內建標誌為True時返回True
  • set():將內建標誌位置為True
  • clear():將標誌設為False
  • wait[timeout]:如果標誌位為True將立即返回,否則阻塞執行緒至等待阻塞狀態,等待其他執行緒呼叫set()。當timeout不為空時,如果發生了超時,則返回值為False,否則返回狀態為True

舉個例子:

# -*- coding: utf-8 -*-
import threading
import time

event = threading.Event()

def func():
    # 等待事件,進入等待阻塞狀態
    print '%s wait for event...' % threading.currentThread().getName()
    event.wait()

    # 收到事件後進入執行狀態
    print '%s recv event.' % threading.currentThread().getName()

t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 傳送事件通知
print 'MainThread set event.'
event.set()

在實際中,通常為每個執行緒準備一個獨立的Event,而不是多個執行緒共享。應用場景,當用戶成功上線了,才開啟接收訊息的執行緒。

concurrent.futures

執行緒池

使用標準庫的from concurrent.futures下的ThreadPoolExecutor
主執行緒可以獲取某一個執行緒的狀態或者某一個任務的狀態(done),以及返回值(result)。

from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
    time.sleep(2)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 建立一個最大可容納2個task的執行緒池
future1 = pool.submit(return_future_result, ("hello"))  # 往執行緒池裡面加入一個task
future2 = pool.submit(return_future_result, ("world"))  # 往執行緒池裡面加入一個task
print(future1.done())  # 判斷task1是否結束
time.sleep(3)
print(future2.done())  # 判斷task2是否結束
print(future1.result())  # 檢視task1返回的結果
print(future2.result())  # 檢視task2返回的結果

ThreadPoolExecutor(pool_count)pool_count代表建立執行緒的數量,會返回一個該執行緒池的執行者物件,這個物件的submit()方法和map()方法,能夠使用執行緒池中的執行緒來執行我們指定的方法,並且返回一個Future物件。並且當執行緒結束之後會自動歸還,如果執行緒全部被佔用,則會阻塞。
如果需要新增引數:

def return_future_result(message, n):
    time.sleep(n)
    return message
pool = ThreadPoolExecutor(max_workers=2)  # 建立一個最大可容納2個task的執行緒池
future1 = pool.submit(return_future_result, "hello", 2)  # 往執行緒池裡面加入一個task
future2 = pool.submit(return_future_result, "world", 1)  # 往執行緒池裡面加入一個task

通過as_completed()方法獲取已經完成的任務的結果:

for future in as_completed(all_tasks):
    print(future.result())

或者使用map

for data in pool.map(get_url, urls):
    print(data)

as_completed不同的是,map返回的不是future型別,同時輸出順序也與urls一致。

主要有以下方法:

  • submit:立即返回
  • result:獲取執行結果
  • cancel:取消task(未開始執行的),成功返回True,失敗返回False
  • as_completed:是一個生成器,先完成的task先返回,返回的是future型別
  • map:根據傳入的順序進行返回,返回的直接是future.result()
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time

def get_url(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# task1 = executor.submit(get_url, (3))
# task2 = executor.submit(get_url, (2))

# print(task1.done())
# print(task2.cancel())

urls = [3, 2, 4]
all_task = [executor.submit(get_url, (url)) for url in urls]
wait(all_task)
print("all task done")
# for future in as_completed(all_task):
#     data = future.result()
#     print("get page {}".format(data))

# for future in executor.map(get_url, urls):
#     print("get page {}".format(future))

上面演示了兩種方法進行多執行緒下載的方式,一種是利用as_completed(),另一種是利用ThreadPoolExecutor.map()
通過ThreadPoolExecutor.submit()方法返回的是futures物件,作為as_completed的引數,因而可以呼叫其result方法,並且不會發生阻塞(因為as_completed()實際上是一個生成器)。
而針對後者,返回值是一個迭代器,通過其__next__方法呼叫各個futuresresult方法,因此得到的是futures的結果
如果需要提交任務的函式是一樣的,就可以簡化成map。但是加入提交的任務函式不一樣的,或者執行的過程之可能出現異常,就要用到submit

流程圖:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
|          |     +----------+       |        |     +-----------+    |         |
|          |     | ...      |       |        |     | ...       |    |         |
|          |     | 6        |       |        |     | 5, call() |    |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+
  • executor.map會建立多個_WorkItem物件,每個物件都傳入了新建立的一個Future物件。
  • 把每個_WorkItem物件然後放進一個叫做「Work Items」dict中,鍵是不同的「Work Ids」
  • 建立一個管理「Work Ids」佇列的執行緒「Local worker thread」,它能做2件事:
    • 「Work Ids」佇列中獲取Work Id, 通過「Work Items」找到對應的_WorkItem。如果這個Item被取消了,就從「Work Items」裡面把它刪掉,否則重新打包成一個_CallItem放入「Call Q」這個佇列。executor的那些程序會從佇列中取_CallItem執行,並把結果封裝成_ResultItems放入「Result Q」佇列中。
    • 「Result Q」佇列中獲取_ResultItems,然後從「Work Items」更新對應的Future物件並刪掉入口。

程序池

multiprocess中的方法與多執行緒的比較類似:

  • apply_async:非阻塞的,與之對應的是apply是阻塞的。所以需要使用join,否則在子程序還沒開始執行之前主程序就已經完成退出了。
  • close:關閉pool,使其不再接受新的任務
  • terminate:結束工作程序,不再處理未完成的任務
  • join:主程序阻塞,等待子程序的退出,join方法要在closeterminate之後使用。
# -*- coding: utf-8 -*-
import multiprocessing
import time

def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == '__main__':
    pool  = multiprocessing.Pool(multiprocessing.cpu_count())
    results = []
    for i in range(4):
        results.append(pool.apply_async(get_html, args=(i,)))
    pool.close()
    pool.join()
    for result in results:
        print(result.get())

輸出結果:

sub_progress success
sub_progress success
sub_progress success
sub_progress success
0
1
2
3

程序間通訊

Queue:不能使用from queue import Queue,這種方式只適用於同一程序下不同執行緒之間的共享,在多程序程式設計中,每一個程序中都會產生一個相應的副本。應該使用multiprocess.Queue。但是,這個Queue不能用於程序池。執行緒池中應該使用Manager().Queue()

# -*- coding: utf-8 -*-
import time
from multiprocessing import Process, Queue
from threading import Thread
# from queue import Queue

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)


if __name__ == '__main__':
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_consumer.start()
    my_producer.start()
    my_producer.join()
    my_consumer.join()

Manager()中定義了很多共享變數。

Pipe:只適用於兩個程序之間的通訊,但是複雜度沒有Queue高。

# -*- coding: utf-8 -*-
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe

def producer(pipe):
    pipe.send("a")

def consumer(pipe):
    print(pipe.recv())


if __name__ == '__main__':
    send_pipe, recv_pipe = Pipe()
    my_producer = Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(recv_pipe, ))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()