1. 程式人生 > >Python 線程

Python 線程

turn 就會 == bmi pri pre emp live www.

一、定義:

  線程顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個進程,進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是執行單位

二、線程定義方式:

1、使用替換threading模塊提供的Thread

from threading import Thread
from multiprocessing import Process

def task():
    print(is running)

if __name__ == __main__:
    t=Thread(target=task,)
    
# t=Process(target=task,) t.start() print()

  2、自定義類,繼承Thread

from threading import Thread
from multiprocessing import Process
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(%s is running %self.name)

if __name__ == __main__: t=MyThread(egon) # t=Process(target=task,) t.start() print()

三、多線程共享同一個進程內的資源

  因為線程間的數據是共享的所以都會用同一個資源

from threading import Thread
from multiprocessing import Process
n=100
def work():
    global n
    n=0

if __name__ == __main__:

    # p=Process(target=work,)
# p.start() # p.join() # print(‘主‘,n) t=Thread(target=work,) t.start() t.join() print(,n)

四、其它相關函數

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread,activeCount,enumerate,current_thread
import time
def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    t=Thread(target=task,)
    t.start()
    t.join()
    print(t.is_alive())
    print(t.getName())
    print(enumerate())
    print()
    print(activeCount())
current_thread的用法
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time

def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    p=Process(target=task)
    p.start()
    print(current_thread())
from threading import Thread,activeCount,enumerate,current_thread
from multiprocessing import Process
import time

def task():
    print(%s is running %current_thread().getName())
    time.sleep(2)

if __name__ == __main__:
    t1=Thread(target=task)
    t2=Thread(target=task)
    t3=Thread(target=task)
    t1.start()
    t2.start()
    t3.start()
    print(current_thread())

五、守護線程

守護線程則是主線程等待其它非守護線程結束,主線程結束則守護線程結束

#再看:守護線程

from threading import Thread
import time

def task1():
    print(123)
    time.sleep(10)
    print(123done)

def task2():
    print(456)
    time.sleep(1)
    print(456done)

if __name__ == __main__:
    t1=Thread(target=task1)
    t2=Thread(target=task2)
    t1.daemon=True
    t1.start()
    t2.start()
    print()

六、線程互斥鎖

  即:線程中誰搶到了鎖誰去執行,沒有搶到的則在等待

from threading import Thread,Lock
import time
n=100
def work():
    global n
    mutex.acquire()#搶到鎖加鎖
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()#解鎖

if __name__ == __main__:
    mutex=Lock()
    l=[]
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        l.append(t)
        t.start()

    for t in l:
        t.join()
    print(run time:%s value:%s %(time.time()-start,n))

七:互斥鎖與join的區別

互斥鎖只是在重要的代碼階段加上誰搶到誰處理,而join則是一個一個的全部把所有的代碼都執行,大大加大執行代碼的時間

join實例:

from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1


if __name__ == __main__:
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        t.start()
        t.join()

    print(run time:%s value:%s %(time.time()-start,n))

互斥鎖實例:

#互斥鎖
from threading import Thread,Lock
import time
n=100
def work():
    time.sleep(0.05)
    global n
    mutex.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    mutex.release()

if __name__ == __main__:
    mutex=Lock()
    l=[]
    start=time.time()
    for i in range(100):
        t=Thread(target=work)
        l.append(t)
        t.start()

    for t in l:
        t.join()
    print(run time:%s value:%s %(time.time()-start,n))

八:線程死鎖與遞規鎖

  死鎖:則是幾個人在搶幾把鎖,但是一個人搶一把鎖,在沒有解這把鎖,則是去搶另一把,則永遠無法搶到,也沒法解除當前的鎖,由為死鎖

from threading import Thread,Lock,RLock
import time
mutexA=Lock()
mutexB=Lock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(\033[45m%s 搶到A鎖\033[0m %self.name)
        mutexB.acquire()
        print(\033[44m%s 搶到B鎖\033[0m %self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(\033[44m%s 搶到B鎖\033[0m %self.name)
        time.sleep(1)
        mutexA.acquire()
        print(\033[45m%s 搶到A鎖\033[0m %self.name)
        mutexA.release()
        mutexB.release()

遞歸鎖:

  則需要threading導入RLock,用這個每一個人拿到的都是這把鎖,解除這把鎖之後才能拿到下把鎖,這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

#遞歸鎖
from threading import Thread,Lock,RLock
import time
mutex=RLock()
class Mythread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutex.acquire()
        print(\033[45m%s 搶到A鎖\033[0m %self.name)
        mutex.acquire()
        print(\033[44m%s 搶到B鎖\033[0m %self.name)
        mutex.release()
        mutex.release()

    def f2(self):
        mutex.acquire()
        print(\033[44m%s 搶到B鎖\033[0m %self.name)
        time.sleep(1)
        mutex.acquire()
        print(\033[45m%s 搶到A鎖\033[0m %self.name)
        mutex.release()
        mutex.release()

九:信號量

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()

from threading import Thread,current_thread,Semaphore
import time,random

sm=Semaphore(5)
def work():
    sm.acquire()
    print(%s 上廁所 %current_thread().getName())
    time.sleep(random.randint(1,3))
    sm.release()

if __name__ == __main__:
    for i in range(20):
        t=Thread(target=work)
        t.start()

十:Event

同進程的一樣

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行

技術分享
event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程;

event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;

event.clear():恢復event的狀態值為False。
from threading import Thread,current_thread,Event
import time
event=Event()

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise ConnectionError(鏈接失敗)
        print(%s 等待第%s次鏈接mysql %(current_thread().getName(),count))
        event.wait(0.5)
        count+=1

    print(%s 鏈接ok % current_thread().getName())


def check_mysql():
    print(%s 正在檢查mysql狀態 %current_thread().getName())
    time.sleep(1)
    event.set()


if __name__ == __main__:
    t1=Thread(target=conn_mysql)
    t2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    t1.start()
    t2.start()
    check.start()

十一:定時器

  定義:指定n秒後執行某操作

from threading import Timer

def hello(n):
    print("hello, world",n)

t = Timer(3, hello,args=(11,))#3秒後執行
t.start()  # after 1 seconds, "hello, world" will be printed

十二:線程queue

  定義:線程的隊列,使用import queue,用法與進程Queue一樣

import queue

q=queue.Queue(3) #隊列:先進先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())


q=queue.LifoQueue(3) #堆棧:後進先出
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())


q=queue.PriorityQueue(3) #數字越小優先級越高
q.put((10,data1))
q.put((11,data2))
q.put((9,data3))

print(q.get())
print(q.get())
print(q.get())

十三、線程池

定義:則是同時開啟多少線程,如果並發則用的線程名則還是已開啟的

#線程池
import requests #pip3 install requests
import os,time,threading
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
def get_page(url):
    print(<%s> get :%s %(threading.current_thread().getName(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
        return {url:url,text:respone.text}

def parse_page(obj):
    dic=obj.result()
    print(<%s> parse :%s %(threading.current_thread().getName(),dic[url]))
    time.sleep(0.5)
    res=url:%s size:%s\n %(dic[url],len(dic[text])) #模擬解析網頁內容
    with open(db.txt,a) as f:
        f.write(res)


if __name__ == __main__:

    # p=Pool(4)
    p=ThreadPoolExecutor(3) #同時開始3個線程
    urls = [
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
        http://www.baidu.com,
    ]


    for url in urls:
        # p.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page,url).add_done_callback(parse_page)

    p.shutdown()
    print(主進程pid:,os.getpid())

Python 線程