1. 程式人生 > >Python第十周 學習筆記(2)_多線程

Python第十周 學習筆記(2)_多線程

學習筆記

多線程

並發

  • concurrency
    • 同一時間內出現多個請求,高並發就是短時間內出現大量請求

並行

  • parallel
    • 並行是解決並發的一個方法

並發的解決

  • 食堂打飯模型

    1、隊列、緩沖區

  • queue(或LifoQueue、PriorityQueue)先進先出緩沖區(排隊打飯),可以設置一個優先隊列(女生優先)

2、爭搶

  • 一旦一個請求獲得資源後,會產生排他鎖(不排隊,誰搶到就是輪到誰打飯)
  • 缺點:有可能存在某些請求一直獲得不到資源

3、預處理

  • 經常請求的資源可以預處理(緩存)。減少請求占用資源的時間(經常被點的菜品可以提前做)

4、並行

  • 通過購買服務器、或多開進程、線程實現並行處理,屬於水平擴展(增加打飯窗口)

5、提速

  • 提高單個CPU性能,貨單個服務器安裝更多CPU,屬於垂直擴展(提高單個窗口打飯速度)

6、消息中間件

  • 起緩沖作用,常見消息中間件:RabbitMQ、ActiveMQ、RocketMQ、kafka

進程

  • 進程間不可隨意共享數據
  • 進程是線程的容器
  • linux存在父進程、子進程(fork)

線程

  • 一個進程可擁有多個線程
  • 線程可以共享進程的資源
  • 每一個線程都擁有線程ID、當前指令指針、寄存器集合與獨立的堆棧
  • 創建線程比創建進程快10-100倍

線程的狀態

ready

  • 線程能夠運行,但在等待被調度。可能線程剛剛創建啟動,或剛剛從阻塞中恢復,或者被其他線程搶占

running

  • 線程正在運行

blocked

  • 線程等待外部事件發生而無法運行,如IO操作

terminated

  • 線程完成,或退出被取消

技術分享圖片

Python線程

  • 進程會啟動一個解釋器進程,線程共享一個解釋器進程

Thread類

def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
  • target

    • 線程調用的對象,就是目標函數
  • name

    • 為線程起個名字
  • args

    • 為目標函數傳遞實參,參數類型需為元組
  • kwargs

    • 為目標函數關鍵字傳參,需為字典
  • daemon
    • 下文詳述

啟動線程

threading.Thread(target=fn, name=‘fn‘).start()

線程退出

  • Python沒有提供線程退出的方法,線程在下面情況時退出
    • 1、線程函數內語句執行完畢
    • 2、線程函數中拋出未處理異常

Python線程沒有優先級、沒有線程組的概念,不能被銷毀、停止、掛起、恢復、中斷

threading屬性、方法

  • current_thread()

    • 返回當前線程對象
  • main_thread()

    • 返回主線程對象
  • active_count()

    • 當前處於alive狀態的線程個數,包括主線程
  • enumerate()

    • 返回所有活著的線程的列表,不包括已經終止的線程和未開始的線程,包括主線程
  • get_ident()
    • 返回當前線程的ID,非0整數

Thread實例屬性、方法

  • name

    • 線程實例的名稱 初始化中的name參數傳入,getName()、setName()可獲取、設置(一般不改)
  • ident

    • 線程ID 線程啟動後才有ID,否則為None。線程退出此ID依舊可以訪問。ID可重復使用
  • is_alive()

    • 返回線程是否存活
  • start()

    • 啟動線程。每個線程只能執行此方法一次
    • start()會調用run()
  • start函數的實現:
if not self._initialized:
    raise RuntimeError("thread.__init__() not called")

if self._started.is_set():
    raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
    _limbo[self] = self
try:
    _start_new_thread(self._bootstrap, ())
except Exception:
    with _active_limbo_lock:
        del _limbo[self]
    raise
self._started.wait()
  • run()

    • 運行線程函數
    • 單運行run()不會建立新線程,只是單純的函數調用
  • run函數的實現:
try:
    if self._target:
        self._target(*self._args, **self._kwargs)
finally:
    # Avoid a refcycle if the thread is running a function with
    # an argument that has a member that points to the thread.
    del self._target, self._args, self._kwargs

每次調用run都會刪除引用self._target, self._args, self._kwargs

  • 示例:
import threading
import time

class MyThread(threading.Thread):
    def start(self):
        print(‘-----start-----‘)
        super().start()

    def run(self):
        print(‘---------run----------‘)
        super().run()

def pr1(lst):
    for _ in lst:
        print(‘{} is running‘.format(threading.current_thread().name))

def worker():
    name = ‘worker{}‘.format(1)
    t = MyThread(target=pr1, name=name, args=([1, 2],))
    # time.sleep(.5)
    t.start()
    # time.sleep(1)
    t.run()

if __name__ == ‘__main__‘:
    worker()
  • 結果(可能發生):
    技術分享圖片

  • 由於多線程,有時start()中調用run()的del刪除引用來不及,如上圖結果所示在刪除引用前第二次的run()依然可以調用,打印出兩次MainThread is running

多線程


  • 一個進程中至少有一個主線程作為程序入口,其他線程稱作工作線程

線程安全

  • 線程執行一段代碼,不會產生不確定行結果,那這段代碼是線程安全的

  • 示例:
import threading

def worker():
    for x in range(100):
        print("{} is running.".format(threading.current_thread().name))

for x in range(5):
    name="worker{}".format(x)
    t=threading.Thread(name=name,target=worker)
    t.start()
  • 結果

技術分享圖片

  • 原因
    • print函數打印時先打印指定的內容再打印默認的換行符,多線程中,一個print的調用中可能會出現線程的切換
    • print函數是線程不安全的

如何解決?

  • 1.print只處理輸入的字符
    • 改為

print("{} is running.\n".format(threading.current_thread().name), end=‘‘)
  • 2.logging

    • 日誌處理模塊,線程安全

    • 示例:
import threading
import logging

def worker():
    for x in range(100):
        logging.warning("{} is running.".format(threading.current_thread().name))

for i in range(1,6):
    name="worker{}".format(i)
    t=threading.Thread(name=name,target=worker)
    t.start()

daemon和non-daemon線程

  • daemon=True

    • 當主線程執行完,如果不存在正在執行的non-daemon線程,則強制所有線程同主線程結束
  • daemon=False

    • 即使主線程執行完,會等待non-daemon線程執行完畢
  • daemon=None
    • 與當前調用線程(父線程)的daemon類型相同

daemon必須在start()之前設置,否則引發RuntimeError異常

  • isDaemon()

    • 是否是daemon線程
  • setDaemon

    • 設置為daemon線程,必須在start之前設置
  • join(timeout=None)
    • 一個線程中調用另一個線程的join方法,調用者將變為阻塞狀態,直到被調用線程終止
    • 一個線程可以被join多次

daemon線程應用場景

  • 1.後臺任務,如心跳包、監控
  • 2.主線程才有用的線程,如主線程中維護公共資源,當主線程已經清理了,準備退出,而工作線程使用這些資源工作也已經沒有意義。一起退出最合適

threading.local類

  • 將這個類實例化得到一個全局對象,但不同的線程使用這個對象存儲的數據其他線程看不見。

threading.Timer

  • 繼承自Thread,用來定義多久執行一個函數

  • threading.Timer(interval, function, args=None, kwargs=None)
    • start方法執行後,Timer對象等待interval時間後,執行function,
    • 如果在執行函數之前的等待階段,使用了cancel方法,就會跳過函數執行

線程同步


概念

  • 線程同步,線程間協同,通過鎖,讓一個線程訪問某些數據時,其他線程不能訪問這些數據,直到該線程完成對數據的操作

  • 不同操作系統實現技術有所不同,有臨界區(Critical Section)、互斥量(Mutex)、信號量(Semaphore)、事件(Event)等

Event

  • 是線程間通信機制中最簡單的實現,使用一個內部的標記flag的變化來操作

  • set()

    • 將標記設為True
  • clear()

    • 將標記設為False
  • is_set()

    • 標記是否為True
  • wait(timeout=None)
    • 設置等待標記為True的時長,None為無限等待,直到等到返回True,未等到超時返回False

使用同一個Event對象的標記flag
不限制等待的個數

wait效率優於time.sleep,它會更快切換線程,提高並發效率

Lock

  • 一旦線程獲得鎖,其他試圖獲取鎖的線程將被阻塞

  • acquire(blocking=True, timeout=-1)

    • 默認阻塞,阻塞可以設置超時時間。非阻塞時,timeout禁止設置。成功獲取鎖,返回True,否則返回False
  • release()
    • 釋放鎖。可以從任何線程調用釋放
    • 已上鎖的鎖,會被重置為unlocked未上鎖的鎖上調用,拋RuntimeError異常

鎖保護了數據完整性,但是性能下降很多

加鎖、解鎖常用語句:

  • 1.使用try...finallly語句
  • 2.with上下文

鎖的應用場景

  • 鎖適用於訪問和修改同一個共享資源的時候,即讀寫同一個資源的時候
  • 如果只讀取同一個共享資源不需要鎖

非阻塞鎖

import threading
import time
import logging

FORMAT = ‘%(asctime)-15s\t [%(threadName)s %(thread)8d] %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):
            logging.info(‘{} {} begin to start‘.format(threading.current_thread(), task.name))
        else:
            logging.info(‘{} {} is working‘.format(threading.current_thread(), task.name))

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task(‘task-{}‘.format(x)) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, name=‘worker-{}‘.format(i), args=(tasks,)).start()

可重入鎖

  • 線程相關鎖
  • 線程A獲得可重復鎖,並可以多次成功獲取,不會阻塞。最後要在線程A中做和acquire次數相同的release

  • 可重入鎖,與線程相關,可在一個線程中獲取鎖,並可繼續在同一線程中不阻塞獲取鎖。當鎖未釋放完,其他線程獲取鎖就會阻塞,直到當前持有鎖的線程釋放完鎖

Condition

  • Condition(lock=None),可以傳入一個Lock或Rlock對象,默認是Rlock

  • acquire(*args)

    • 獲取鎖
  • wait(self, timeout=None)

    • 等待或超時
  • notify(n=1)

    • 喚醒至多指定數目個數的等待的線程,沒有等待的線程就沒有任何操作
  • notify_all()

    • 喚醒所有等待的線程
  • 用於生產者、消費者模型,解決生產者消費者速度匹配問題
  • 采用了通知機制,非常有效率

使用方式

  • 使用Condition,必須先acquire,用完了要release,因為內部使用了鎖,最好的方式是使用with上下文
  • 消費者wait,等待通知
  • 生產者生產好消息,對消費者發通知,可以使用notify或notify_all

Python第十周 學習筆記(2)_多線程