1. 程式人生 > >python 並發和線程

python 並發和線程

一個 war 效果 也不會 關於 另一個 while循環 隊列 process

並發和線程

基本概念 - 並行、並發

並行, parallel

互不幹擾的在同一時刻做多件事;

如,同一時刻,同時有多輛車在多條車道上跑,即同時發生的概念.

並發, concurrency

同時做某些事,但是強調同一時段做多件事.

如,同一路口,發生了車輛要同時通過路面的事件.

隊列, 緩沖區

類似排隊,是一種天然解決並發的辦法.排隊區域就是緩沖區.

解決並發:

【 "食堂打飯模型", 中午12點,大家都湧向食堂,就是並發.人很多就是高並發.】

1、隊列, 緩沖區:

隊列: 即排隊.

緩沖區: 排成的隊列.

優先隊列: 如果有男生隊伍和女生隊伍,女生隊伍優先打飯,就是優先隊列.

2、爭搶:

鎖機制: 爭搶打飯,有人搶到,該窗口在某一時刻就只能為這個人服務,鎖定窗口,即鎖機制.

爭搶也是一種高並發解決方案,但是有可能有人很長時間搶不到,所以不推薦.

3、預處理:

統計大家愛吃的菜品,最愛吃的80%熱門菜提前做好,20%冷門菜現做,這樣即使有人鎖定窗口,也能很快釋放.

這是一種提前加載用戶需要的數據的思路,預處理思想,緩存常用.

4、並行:

開多個打飯窗口,同時提供服務.

IT日常可以通過購買更多服務器,或多開線程,進程實現並行處理,解決並發問題.

這是一種水平擴展的思路.

註: 如果線程在單CPU上處理,就不是並行了.

5、提速:

通過提高單個窗口的打飯速度,也是解決並發的方式.

IT方面提高單個CPU性能,或單個服務器安裝更多的CPU.

這是一種垂直擴展的思想.

6、消息中間件:

如上地地鐵站的九曲回腸的走廊,緩沖人流.

常見消息中間件: RabbitMQ, ActiveMQ(Apache), RocketMQ(阿裏Apache), kafka(Apache)等.

【以上例子說明: 技術源於生活! 】

進程和線程

a) 在實現了線程的操作系統中,線程是操作系統能夠運算調度的最小單位.

b) 線程被包含在進程中,是進程的實際運作單位.

c) 一個程序的執行實例就是一個進程.

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎.

進程和程序的關系: 進程是線程的容器.

Linux進程有父進程和子進程之分,windows的進程是平等關系.

線程有時稱為輕量級進程,一個標準的線程由線程ID,當前指令指針,寄存器集合和堆棧組成.

當運行一個程序時,OS會創建一個進程。它會使用系統資源(CPU、內存和磁盤空間)和OS內核中的數據結構(文件、網絡連接、用量統計等)。

進程之間是互相隔離的,即一個進程既無法訪問其他進程的內容,也無法操作其他進程。

操作系統會跟蹤所有正在運行的進程,給每個進程一小段運行時間,然後切換到其他進程,這樣既可以做到公平又可以響應用戶操作。

可以在圖形界面中查看進程狀態,在在Windows上可以使用任務管理器。也可以自己編寫程序來獲取進程信息。

# 獲取正在運行的python解釋器的進程號和當前工作目錄,及用戶ID、用戶組ID。

In [1]: import os

In [2]: os.getpid()

Out[2]: 2550


In [3]: os.getuid()

Out[3]: 0

 
In [4]: os.getcwd()

Out[4]: ‘/root‘

 
In [5]: os.getgid()

Out[5]: 0

 
In [6]:

  

對線程、線程的理解:

  • 進程是獨立的王國,進程間不能隨便共享數據.
  • 線程是省份,同一進程內的線程可以共享進程的資源,每一個線程有自己獨立的堆棧.

線程的狀態:

  • 就緒(Ready): 線程一旦運行,就在等待被調度.
  • 運行(Running): 線程正在運行.
  • 阻塞(Blocked): 線程等待外部事件發生而無法運行,如I/O操作.
  • 終止(Terminated): 線程完成或退出,或被取消.

python中的進程和線程: 進程會啟動一個解釋器進程,線程共享一個解釋器進程.

python的線程開發

python線程開發使用標準庫threading.

thread類

# 簽名

def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)

 

  • target: 線程調用的對象,就是目標函數.
  • name: 為線程起個名字.
  • args: 為目標函數傳遞實參, 元組.
  • kwargs: 為目標函數關鍵字傳參, 字典.

線程啟動

import threading

# 最簡單的線程程序

def worker():

    print("I‘m working")

    print("Finished")

 

t = threading.Thread(target=worker, name=‘worker‘)  # 線程對象.

t.start()

通過threading.Thread創建一個線程對象,target是目標函數,name可以指定名稱.

但是線程沒有啟動,需要調用start方法.

線程會執行函數,是因為線程中就是執行代碼的,而最簡單的封裝就是函數,所以還是函數調用.

函數執行完,線程也會隨之退出.

如果不讓線程退出,或者讓線程一直工作: 函數內部使用while循環.

import threading

import time

def worker():

    while True:

        time.sleep(1)

        print("I‘m work")

    print(‘Finished‘)

 

t = threading.Thread(target=worker, name=‘worker‘)  # 線程對象.

t.start()   # 啟動.

  

線程退出

python沒有提供線程退出的方法,在下面情況時會退出:

  • 線程函數內語句執行完畢.
  • 線程函數中拋出未處理的異常.
import threading

import time  

 

def worker():

    count = 0

    while True:

        if (count > 5):

            raise RuntimeError()

            # return

        time.sleep(1)

        print("I‘m working")

        count += 1

t = threading.Thread(target=worker, name=‘worker‘)  # 線程對象.

t.start()  # 啟動.

 

print("==End==")

python的線程沒有優先級,沒有線程組的概念,也不能被銷毀、停止、掛起,自然也沒有恢復、中斷.

線程的傳參

import threading

import time

 

def add(x, y):

    print(‘{} + {} = {}‘.format(x, y, x + y, threading.current_thread()))

 

thread1 = threading.Thread(target=add, name=‘add‘, args=(4, 5))  # 線程對象.

thread1.start()  # 啟動.

time.sleep(2)

 

thread2 = threading.Thread(target=add, name=‘add‘,args=(5, ), kwargs={‘y‘: 4})  # 線程對象.

thread2.start()  # 啟動.

time.sleep(2)

 

thread3 = threading.Thread(target=add, name=‘add‘, kwargs={‘x‘: 4, ‘y‘: 5})  # 線程對象.

thread3.start()  # 啟動.

  

線程傳參和函數傳參沒什麽區別,本質上就是函數傳參.

threading的屬性和方法

current_thread() # 返回當前線程對象.

main_thread() # 返回主線程對象.

active_count() # 當前處於alive狀態的線程個數.

enumerate() # 返回所有活著的線程的列表,不包括已經終止的線程和未開始的線程.

get_ident() # 返回當前線程ID,非0整數.

active_count、enumerate方法返回的值還包括主線程。

import threading

import time

 

def showthreadinfo():

    print(‘currentthread = {}‘.format(threading.current_thread()))

    print(‘main thread = {}‘.format(threading.main_thread()), ‘"主線程對象"‘)

    print(‘active count = {}‘.format(threading.active_count()), ‘"alive"‘)

 

def worker():

    count = 1

    showthreadinfo()

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print("I‘m working")

 

t = threading.Thread(target=worker, name=‘worker‘)  # 線程對象.

showthreadinfo()

t.start()  # 啟動.

 

print(‘==END==‘)

  

thread實例的屬性和方法

name: 只是一個名稱標識,可以重名, getName()、setName()來獲取、設置這個名詞。

ident: 線程ID, 它是非0整數。線程啟動後才會有ID,否則為None。線程退出,此ID依舊可以訪問。此ID可以重復使用。

is_alive(): 返回線程是否活著。

註: 線程的name是一個名稱,可以重復; ID必須唯一,但可以在線程退出後再利用。

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print(threading.current_thread().name, ‘~~~~~~~~~~~~~~~~~~~~~~~‘)

 

t = threading.Thread(name=‘worker‘, target=worker)

print(t.ident)

t.start()

 

while True:

    time.sleep(1)

    if t.is_alive():

        print(‘{} {} alive‘.format(t.name, t.ident))

    else:

        print(‘{} {} dead‘.format(t.name, t.ident))

t.start()

  

start(): 啟動線程。每一個線程必須且只能執行該方法一次。

run(): 運行線程函數。

為了演示,派生一個Thread子類

# start方法.

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count >= 5):

            break

        time.sleep(1)

        count += 1

        print(‘worker running‘)

 

class MyThread(threading.Thread):

    def start(self):

        print(‘start~~~~~~~~~~~~~‘)

        super().start()

 

    def run(self):

        print(‘run~~~~~~~~~~~~~~~~~‘)

        super().run()

 

t = MyThread(name=‘worker‘, target=worker)

t.start()

run方法

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print(‘worker running‘)

 

class MyThread(threading.Thread):

    def start(self):

        print(‘start~~~~~~~~~~~~~~~‘)

        super().start()

 

    def run(self):

        print(‘run~~~~~~~~~~~~~~~~~‘)

        super().run()

 

t = MyThread(name=‘worker‘, target=worker)

  

# t.start()

t.run()

start()方法會調用run()方法,而run()方法可以運行函數。

這兩個方法看似功能重復,但不能只留其一,如下:

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print("worker running")

        print(threading.current_thread().name)

 

class MyThread(threading.Thread):

    def start(self):

        print(‘start~~~~~~~~~~~~~‘)

        super().start()

 

    def run(self):

        print(‘run~~~~~~~~~~~~~~~‘)

        super().run()

 

t = MyThread(name=‘worker‘, target=worker)

# t.start()

  

t.run() # 分別執行start或者run方法。

使用start方法啟動線程,啟動了一個新的線程,名字叫做worker running,但是使用run方法啟動的線程,並沒有啟動新的線程,只是在主線程中調用了一個普通的函數而已。

因此,啟動線程要使用start方法,才能啟動多個線程。

多線程

顧名思義,多個線程,一個進程中如果有多個線程,就是多線程,實現一種並發。

import threading

import time

 

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(2)

        count += 1

        print(‘worker running‘)

        print(threading.current_thread().name, threading.current_thread().ident)

 

class MyThread(threading.Thread):

    def start(self):

        print(‘start~~~~~~~~~~~~~~‘)

        super().start()

 

    def run(self):

        print(‘run~~~~~~~~~~~~~~~~~~‘)

        super().run()  # 查看父類在做什麽?

 

t1 = MyThread(name=‘worker1‘, target=worker)

t2 = MyThread(name=‘worker2‘, target=worker)

 

t1.start()

t2.start()

可以看到worker1和worker2交替執行。

換成run方法試試:

import threading

import time

def worker():

    count = 0

    while True:

        if (count > 5):

            break

        time.sleep(1)

        count += 1

        print(‘worker running‘)

        print(threading.current_thread().name, threading.current_thread().ident)

 

class MyThread(threading.Thread):

    def start(self):

        print(‘start~~~~~~‘)

        super().start()

 

    def run(self):

        print(‘run~~~~~~~~~~~~‘)

        super().run()

 

t1 = MyThread(name=‘worker1‘, target=worker)

t2 = MyThread(name=‘worker2‘, target=worker)

 

# t1.start()

# t2.start()

t1.run()

t2.run()

沒有開新的線程,這就是普通函數調用,所以執行完t1.run(),然後執行t2.run(),這裏就不是多線程。

當使用start方法啟動線程後,進程內有多個活動的線程並行的工作,就是多線程。

一個進程中至少有一個線程,並作為程序的入口,這個線程就是主線程。一個進程至少有一個主線程。

其他線程稱為工作線程。

線程安全

需要在ipython中演示:

In [1]: import threading

   ...:  def worker():

   ...:     for x in range(5):

   ...:         print("{} is running".format(threading.current_thread().name))

   ...:

   ...:  for x in range(1, 5):

   ...:     name = ‘worker{}‘.format(x)

   ...:     t = threading.Thread(name=name, target=worker)

   ...:     t.start()

   ...:

  

可以看到運行結果中,本應該是一行行打印,但很多字符串打印在了一起,這說明print函數被打斷了,被線程切換打斷了。

print函數分兩步,第一步打印字符串,第二部換行,就在這之間,發生了線程的切換。

說明print函數不是線程安全函數。

線程安全: 線程執行一段代碼,不會產生不確定的結果,那這段代碼就是線程安全的。

1、不讓print打印換行:

import threading

def worker():

    for x in range(100):

        print(‘{} is running\n‘.format(threading.current_thread().name), end=‘‘)

 

for x in range(1, 5):

    name = ‘worker{}‘.format(x)

    t = threading.Thread(name=name, target=worker)

    t.start()

  

字符串是不可變類型,它可以作為一個整體不可分割輸出。end=‘‘的作用就是不讓print輸出換行。

2、使用logging.

標準庫裏面的logging模塊、日誌處理模塊、線程安全、生成環境代碼都使用logging。

import threading

import logging

 

def worker():

    for x in range(100):

        # print("{} is running.\n".format(threading.current_thread().name), end=‘‘)

        logging.warning(‘{} is running‘.format(threading.current_thread().name))

 

for x in range(1, 5):

    name = ‘work{}‘.format(x)

    t = threading.Thread(name=name, target=worker)

    t.start()

  

daemon線程和non-daemon線程

註:這裏的daemon不是Linux中的守護進程。

進程靠線程執行代碼,至少有一個主線程,其他線程是工作線程。

主線程是第一個啟動的線程。

父線程:如果線程A中啟動了一個線程B,A就是B的父線程。

子線程:B就是A的子線程。

python中構造線程的時候可以設置daemon屬性,這個屬性必須在start方法之前設置好。

源碼Thread的__init__方法中:

if daemon is not None:

    self._daemonic = daemon  # 用戶設定bool值。

else:

    self._daemonic = current_thread().daemon

self._ident = None

  

線程daemon屬性,如果設定就是用戶的設置,否則就取當前線程的daemon值。

主線程是non-daemon,即daemon=False。

import time

import threading

 

def foo():

    time.sleep(5)

    for i in range(20):

        print(i)

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=False)

t.start()

print(‘Main Thread Exiting‘)

運行發現線程t依然執行,主線程已經執行完,但是一直等著線程t.

修改為 t = threading.Threading(target=foo, daemon=True),運行發現主線程執行完程序立即結束了,根本沒有等線程t.

import threading

import logging

logging.basicConfig(level=logging.INFO) #警告級別

import time

 

def worker():

    for x in range(10):

        time.sleep(1)

        msg = ("{} is running".format(threading.current_thread()))

        logging.info(msg)

        t = threading.Thread(target=worker1,name="worker1-{}".format(x),daemon=False)

        t.start()

        # t.join()

 

def worker1():

    for x in range(10):

        time.sleep(0.3)

        msg = ("¥¥¥¥¥{} is running".format(threading.current_thread()))

        logging.info(msg)

 

t = threading.Thread(target=worker,name=‘worker-{}‘.format(0),daemon=True)

t.start()

# t.join()

time.sleep(0.3)

print(‘ending‘)

print(threading.enumerate())

  

結論:

daemon=False 運行發現子線程依然執行,主線程已經執行完,但是主線程會一直等著子線程執行完.

daemon=True 運行發現主線程執行完程序立即結束了。

daemon屬性:表示線程是否是daemon線程,這個值必須在start()之前設置,否則引發RuntimeError異常。

isDaemon():是否是daemon線程。

setDaemon:設置為daemon線程,必須在start方法之前設置。

總結:

線程具有一個daemon屬性,可以顯式設置為True或False,也可以不設置,不設置則取默認值None。

如果不設置daemon,就取當前線程的daemon來設置它。子子線程繼承子線程的daemon值,作用和設置None一樣。

主線程是non-daemon線程,即daemon=False。

從主線程創建的所有線程不設置daemon屬性,則默認都是daemon=False,也就是non-daemon線程。

python程序在沒有活著的non-daemon線程運行時退出,也就是剩下的只能是daemon線程,主線程才能退出,否則主線程就只能等待。

如下程序輸出:

import time

import threading

def bar():

    time.sleep(10)

    print(‘bar‘)

 

def foo():

    for i in range(20):

        print(i)

    t = threading.Thread(target=bar, daemon=False)

    t.start()

 

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=True)

t.start()

print(‘Main Threading Exiting‘)

上例中,沒有輸出bar這個字符串,如何修改才會打印出來bar?

 import time

import threading

 

def bar():

    time.sleep(1)

    print(‘bar‘)

 

def foo():

    for i in range(5):

        print(i)

    t = threading.Thread(target=bar, daemon=False)

    t.start()

 

# 主線程是non-daemon線程.

t = threading.Thread(target=foo, daemon=True)

t.start()

time.sleep(1)

print(‘Main Threading Exiting‘)

再看一個例子,看看主線程合適結束daemon線程。

 import time

import threading

 

def foo(n):

    for i in range(n):

        print(i)

        time.sleep(1)

 

t1 = threading.Thread(target=foo, args = (10, ), daemon=True)  # 調換10和20,看看效果。

t1.start()

t2 = threading.Thread(target=foo, args = (20, ), daemon=False)

t2.start()

 

time.sleep(2)

print(‘Main Threading Exiting‘)

上例說明,如果有non-daemon線程的時候,主線程退出時,也不會殺掉所有daemon線程,直到所有non-daemon線程全部結束,

如果還有daemon線程,主線程需要退出,會結束所有 daemon線程,退出。

join方法

import time

import threading

 

def foo(n):

    for i in range(n):

        print(i)

        time.sleep(1)

t1 = threading.Thread(target=foo, args=(10, ), daemon=False)

t1.start()

t1.join()  # 設置join.

print(‘Main Thread Exiting‘)

使用了join方法後,daemon線程執行完了,主線程才退出。

join(timeout=None),是線程的標準方法之一。

一個線程中調用另一個線程的join方法,調用者將被阻塞,直到被調用線程終止。

一個線程可以被join多次。

timeout參數指定調用者等待多久,沒有設置超時,就一直等待被調用線程結束。

調用誰的join方法,就是join誰,就要等誰。

python 並發和線程