1. 程式人生 > 程式設計 >Python 如何建立一個執行緒池

Python 如何建立一個執行緒池

問題

你建立一個工作者執行緒池,用來響應客戶端請求或執行其他的工作。

解決方案

concurrent.futures 函式庫有一個 ThreadPoolExecutor 類可以被用來完成這個任務。 下面是一個簡單的TCP伺服器,使用了一個執行緒池來響應客戶端:

from socket import AF_INET,SOCK_STREAM,socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock,client_addr):
  '''
  Handle a client connection
  '''
  print('Got connection from',client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')
  sock.close()

def echo_server(addr):
  pool = ThreadPoolExecutor(128)
  sock = socket(AF_INET,SOCK_STREAM)
  sock.bind(addr)
  sock.listen(5)
  while True:
    client_sock,client_addr = sock.accept()
    pool.submit(echo_client,client_sock,client_addr)

echo_server(('',15000))

如果你想手動建立你自己的執行緒池, 通常可以使用一個Queue來輕鬆實現。下面是一個稍微不同但是手動實現的例子:

from socket import socket,AF_INET,SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
  '''
  Handle a client connection
  '''
  sock,client_addr = q.get()
  print('Got connection from',client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')

  sock.close()

def echo_server(addr,nworkers):
  # Launch the client workers
  q = Queue()
  for n in range(nworkers):
    t = Thread(target=echo_client,args=(q,))
    t.daemon = True
    t.start()

  # Run the server
  sock = socket(AF_INET,client_addr = sock.accept()
    q.put((client_sock,client_addr))

echo_server(('',15000),128)

使用 ThreadPoolExecutor 相對於手動實現的一個好處在於它使得 任務提交者更方便的從被呼叫函式中獲取返回值。例如,你可能會像下面這樣寫:

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
  u = urllib.request.urlopen(url)
  data = u.read()
  return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url,'http://www.python.org')
b = pool.submit(fetch_url,'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

例子中返回的handle物件會幫你處理所有的阻塞與協作,然後從工作執行緒中返回資料給你。 特別的,a.result() 操作會阻塞程序直到對應的函式執行完成並返回一個結果。

討論

通常來講,你應該避免編寫執行緒數量可以無限制增長的程式。例如,看看下面這個伺服器:

from threading import Thread
from socket import socket,SOCK_STREAM

def echo_client(sock,client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')
  sock.close()

def echo_server(addr,nworkers):
  # Run the server
  sock = socket(AF_INET,client_addr = sock.accept()
    t = Thread(target=echo_client,args=(client_sock,client_addr))
    t.daemon = True
    t.start()

echo_server(('',15000))

儘管這個也可以工作, 但是它不能抵禦有人試圖通過建立大量執行緒讓你伺服器資源枯竭而崩潰的攻擊行為。 通過使用預先初始化的執行緒池,你可以設定同時執行執行緒的上限數量。

你可能會關心建立大量執行緒會有什麼後果。 現代作業系統可以很輕鬆的建立幾千個執行緒的執行緒池。 甚至,同時幾千個執行緒等待工作並不會對其他程式碼產生效能影響。 當然了,如果所有執行緒同時被喚醒並立即在CPU上執行,那就不同了——特別是有了全域性直譯器鎖GIL。 通常,你應該只在I/O處理相關程式碼中使用執行緒池。

建立大的執行緒池的一個可能需要關注的問題是記憶體的使用。 例如,如果你在OS X系統上面建立2000個執行緒,系統顯示Python程序使用了超過9GB的虛擬記憶體。 不過,這個計算通常是有誤差的。當建立一個執行緒時,作業系統會預留一個虛擬記憶體區域來 放置執行緒的執行棧(通常是8MB大小)。但是這個記憶體只有一小片段被實際對映到真實記憶體中。 因此,Python程序使用到的真實記憶體其實很小 (比如,對於2000個執行緒來講,只使用到了70MB的真實記憶體,而不是9GB)。 如果你擔心虛擬記憶體大小,可以使用 threading.stack_size() 函式來降低它。例如:

import threading
threading.stack_size(65536)

如果你加上這條語句並再次執行前面的建立2000個執行緒試驗, 你會發現Python程序只使用到了大概210MB的虛擬記憶體,而真實記憶體使用量沒有變。 注意執行緒棧大小必須至少為32768位元組,通常是系統記憶體頁大小(4096、8192等)的整數倍。

以上就是Python 如何建立一個執行緒池的詳細內容,更多關於Python 建立執行緒池的資料請關注我們其它相關文章!