1. 程式人生 > >Python學習—python中的線程

Python學習—python中的線程

同一時間 版本 完成 相互 目標 check 歐洲 如何選擇 方案

1.線程定義

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。一個進程至少有一個線程,一個進程必定有一個主線程。

2.創建線程

創建線程的兩個模塊:
(1)thread(在python3中改名為_thread)
(2)threding
_thread提供了低級別的、原始的線程以及一個簡單的鎖。threading基於Java的線程模型設計。thread和threading模塊都可以用來創建和管理線程,而thread模塊提供了基本的線程和鎖支持。threading提供的是更高級的完全的線程管理。低級別的thread模塊是推薦給高手用,一般應用程序推薦使用更高級的threading模塊:

1.它更先進,有完善的線程管理支持,此外,在thread模塊的一些屬性會和threading模塊的這些屬性沖突。
2.thread模塊有很少的(實際上是一個)同步原語,而threading卻有很多。
3.thread模塊沒有很好的控制,特別當你的進程退出時,比如:當主線程執行完退出時,其他的線程都會無警告,無保存的死亡,而threading會允許默認,重要的子線程完成後再退出,它可以特別指定daemon類型的線程。

1.通過模塊直接創建線程

_thread模塊創建進程

import _thread

def job(name):
    print("%s正在做工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #_thread模塊 創建2個線程,再加上主線程,這個程序運行就一共有三個線程
        _thread.start_new_thread(job,(‘ddd‘,))
        _thread.start_new_thread(job,(‘eee‘,))
    except Exception as e:
        print("創建線程失敗。",e)
    else:
        print("創建線程成功。")
    print(‘主線程結束‘)

每次運行程序可以看到不同的結果:

(1)
創建線程成功。eee正在做工作........
eee工作完成...........
ddd正在做工作........
ddd工作完成...........

(2)
創建線程成功。ddd正在做工作........eee正在做工作........
eee工作完成...........
ddd工作完成...........

(3)
創建線程成功。
ddd正在做工作........

(4)
創建線程成功。eee正在做工作........

這些結果不同,是因為線程並發執行,三個線程來回切換在cpu工作,且當主線程結束後,不管其它線程是否完成工作都被迫結束。

通過threading模塊創建線程

def job(name):
    print("%s正在做第一部分工作........" %name)
    print("%s正在做第二部分工作........" %name)
    print("%s正在做第三部分工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #threading模塊 創建新的線程 返回一個線程對象
        #target 為線程需要做的任務,args為任務傳遞所需要參數(參數用元組組織起來),name為創建的線程命名(可以不取名)
        t1 = threading.Thread(target=job,args=(‘aaa‘,),name=‘job1_name‘)
        # start方法使線程開始執行
        t1.start()
        t2 = threading.Thread(target=job,args=(‘bbb‘,),name=‘job2_name‘)
        t2.start()
    except Exception as e:
        print("創建線程失敗\n",e)
    print(‘主線程結束.....‘)

每次運行程序的結果:

(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
bbb正在做第一部分工作........
主線程結束.....
aaa正在做第三部分工作........
aaa工作完成...........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........

(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........主線程結束.....
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........

(3)
aaa正在做第一部分工作........
aaa正在做第二部分工作........
aaa正在做第三部分工作........bbb正在做第一部分工作........

aaa工作完成...........bbb正在做第二部分工作........
主線程結束.....
bbb正在做第三部分工作........
bbb工作完成...........

可以看到,不同的多個線程是相互交叉著在cpu執行的,和_thread不同的是它創建了一個線程類對象,也不會因為主線程的結束而結束所有的線程。

使用join方法
在A線程中調用了B線程的join法時,表示只有當B線程執行完畢時,A線程才能繼續執行。多個線程使用了join方法,剩下的其它線程只有在這些線程執行完後才能繼續執行。
這裏調用的join方法是沒有傳參的,join方法其實也可以傳遞一個參數給它的。
join方法中如果傳入參數,則表示這樣的意思:如果A線程中掉用B線程的join(10),則表示A線程會等待B線程執行10毫秒,10毫秒過後,A、B線程並行執行。
需要註意的是,jdk規定,join(0)的意思不是A線程等待B線程0秒,而是A線程等待B線程無限時間,直到B線程執行完畢,即join(0)等價於join()。

def job(name):
    print("%s正在做第一部分工作........" %name)
    print("%s正在做第二部分工作........" %name)
    print("%s正在做第三部分工作........" %name)
    print("%s工作完成..........." %name)

if __name__ == "__main__":
    try:
        #threading模塊 創建新的線程 返回一個線程對象
        #target 為線程需要做的任務,args為任務傳遞所需要參數(參數用元組組織起來),name為創建的線程命名(可以不取名)
        t1 = threading.Thread(target=job,args=(‘aaa‘,),name=‘job1_name‘)
        # start方法使線程開始執行
        t1.start()
        t2 = threading.Thread(target=job,args=(‘bbb‘,),name=‘job2_name‘)
        t2.start()
        t1.join()
        t2.join()
    except Exception as e:
        print("創建線程失敗\n",e)
    print(‘主線程結束.....‘)

每次運行程序的結果:

(1)
aaa正在做第一部分工作........
aaa正在做第二部分工作........bbb正在做第一部分工作........
aaa正在做第三部分工作........
aaa工作完成...........

bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
主線程結束.....

(2)
aaa正在做第一部分工作........
bbb正在做第一部分工作........
bbb正在做第二部分工作........
bbb正在做第三部分工作........
bbb工作完成...........
aaa正在做第二部分工作........
aaa正在做第三部分工作........
aaa工作完成...........
主線程結束.....

(3)
aaa正在做第一部分工作........bbb正在做第一部分工作........

bbb正在做第二部分工作........
bbb正在做第三部分工作........aaa正在做第二部分工作........
aaa正在做第三部分工作........

bbb工作完成...........
aaa工作完成...........
主線程結束.....

2.通過繼承Thread類創建線程

當通過繼承Thread類來創建線程時,需要傳入參數,可以在構造方法增加相應的屬性,以此來傳入所需要的參數。
Thread類有一個run方法,當創建一個線程後,使用start方法時,實際上就是在調用類裏面的run方法,因此可以在繼承Thread類的時候,重寫run方法來完成自己的任務。

import threading

class Jobthread(threading.Thread):
    def __init__(self,name):
        super(Jobthread, self).__init__()
        self.name = name
    #重寫Thread類的run方法
    def run(self):
        print(‘%s線程待完成第一部分工作‘ %self.name)
        print("%s正在做第二部分工作........" %self.name)
        print("%s正在做第二部分工作........" %self.name)

if __name__ == "__main__":
    #實例化類創建第一個線程對象
    t1 = Jobthread(‘aaa‘)
    t1.start()
    #實例化類創建第二個線程對象
    t2 = Jobthread(‘bbb‘)
    t2.start()
    t1.join()
    t2.join()
    print(‘主線程結束.....‘)

每次運行程序的結果:

(1)
wwww正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第一部分工作
eeee正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
主線程結束.....

(2)
wwww正在做第一部分工作
eeee正在做第一部分工作
wwww正在做第二部分工作........
eeee正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主線程結束.....

(3)
wwww正在做第一部分工作eeee正在做第一部分工作
eeee正在做第二部分工作........

wwww正在做第二部分工作........
wwww正在做第二部分工作........
eeee正在做第二部分工作........
主線程結束.....

可以看到,通過繼承線程類,然後重寫run方法,實例化這個類,這樣也可以新創建線程,在某些情況下,這樣還更加方便。

3.守護線程-daemon

線程的Daemon屬性:當主線程執行結束, 讓沒有執行完成的線程強制結束的一個屬性:daemon
setDaemon方法是改變線程類的一個屬性:daemon,也可以在創建線程的時候指定這個屬性的值,他的值默認為None

import threading
import time

# 任務1:
def music(name):
    for i in range(2):
        print("正在聽音樂%s" %(name))
        time.sleep(2)
        print(‘聽音樂結束‘)
# 任務2:
def code(name):
    for i in range(2):
        print("正在編寫代碼%s" %(name))
        time.sleep(2)
        print(‘寫代碼結束‘)

if __name__ == ‘__main__‘:
    t1 = threading.Thread(target=music, args=("中國夢",))
    t2 = threading.Thread(target=code, args=("爬蟲", ))

    # 將t1線程聲明為守護線程, 如果設置為True, 子線程啟動, 當主線程執行結束, 子線程也結束
    # 設置setDaemon必須在啟動線程之前進行設置;
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    print(‘完成任務......‘)

運行結果:

(1)
正在聽音樂中國夢正在編寫代碼爬蟲
完成任務......

(2)
正在聽音樂中國夢
正在編寫代碼爬蟲完成任務......

當設置daemon屬性為True,就和_thread模塊的線程一樣主線程結束,其它線程也被迫結束

4.線程中的鎖

1.全局解釋鎖

什麽是全局解釋器鎖(GIL)
Python代碼的執行由Python 虛擬機(也叫解釋器主循環,CPython版本)來控制,Python 在設計之初就考慮到要在解釋器的主循環中,同時只有一個線程在執行,即在任意時刻,只有一個線程在解釋器中運行。對Python 虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
即全局解釋器鎖,使得在同一時間內,python解釋器只能運行一個線程的代碼,這大大影響了python多線程的性能。
需要明確的一點是GIL並不是Python的特性
GIL是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念裏CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。
python GIL 會影響多線程等性能的原因:
因為在多線程的情況下,只有當線程獲得了一個全局鎖的時候,那麽該線程的代碼才能運行,而全局鎖只有一個,所以使用python多線程,在同一時刻也只有一個線程在運行,因此在即使在多核的情況下也只能發揮出單核的性能。
經過GIL這一道關卡處理,會增加執行的開銷。這意味著,如果你想提高代碼的運行速度,使用threading包並不是一個很好的方法。
在多線程環境中,Python 虛擬機按以下方式執行:

  1. 設置GIL
  2. 切換到一個線程去運行
  3. 運行:
    a. 指定數量的字節碼指令,或者
    b. 線程主動讓出控制(可以調用time.sleep(0))
  4. 把線程設置為睡眠狀態
  5. 解鎖GIL
  6. 再次重復以上所有步驟
    既然python在同一時刻下只能運行一個線程的代碼,那線程之間是如何調度的呢?
    對於有io操作的線程,當一個線程在做io操作的時候,因為io操作不需要cpu,所以,這個時候,python會釋放python全局鎖,這樣其他需要運行的線程就會使用該鎖。
    對於cpu密集型的線程,比如一個線程可能一直需要使用cpu做計算,那麽python中會有一個執行指令的計數器,當一個線程執行了一定數量的指令時,該線程就會停止執行並讓出當前的鎖,這樣其他的線程就可以執行代碼了。
    由上面可知,至少有兩種情況python會做線程切換,一是一但有IO操作時,會有線程切換,二是當一個線程連續執行了一定數量的指令時,會出現線程切換。當然此處的線程切換不一定就一定會切換到其他線程執行,因為如果當前線程優先級比較高的話,可能在讓出鎖以後,又繼續獲得鎖,並優先執行。

這裏就可以將操作分兩種:
i/o密集型
cpu密集型(計算密集型)
對於前者我們盡可能的采用多線程方式,後者盡可能采用多進程方式

2.線程鎖

為什麽會需要線程鎖?
多個線程對同一個數據進行修改時, 會出現不可預料的情況。
例如:

def add():
    global money
    for i in range(1000000):
        money += 1

def reduce():
    global money
    for  i in range(1000000):
        money -= 1

if __name__ =="__main__":
    money = 0
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=reduce)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(money)

因為沒有對變量money做訪問限制,在某一個線程對其進行操作時,另一個線程仍可以對它進行訪問、操作,致使最終結果出錯,且不可預料,不是期待值。

(1)
55651
(2)
-133447
(3)
-236364

當我們使用線程鎖的時候:

import threading

def add(lock):
    global money
    lock.acquire()
    for i in range(100000):
        money += 1
    lock.release()

def reduce(lock):
    global money
    lock.acquire()
    for  i in range(1000000):
        money -= 1
    lock.release()

if __name__ =="__main__":
    money = 0
    lock = threading.Lock()
    t1 = threading.Thread(target=add,args=(lock,))
    t2 = threading.Thread(target=reduce,args=(lock,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(money)

運行結果正確,始終為0

5.多線程

使用多線程來查ip的地理位置

import json
from urllib.request import urlopen

class Job(threading.Thread):
    def __init__(self,ip):
        super(Job,self).__init__()
        self.ip = ip
    def check_ip(self):
        url = ‘http://ip.taobao.com/service/getIpInfo.php?ip=%s‘ % self.ip
        text = urlopen(url).read().decode(‘utf-8‘)
        d = json.loads(text)[‘data‘]
        country = d[‘country‘]
        city = d[‘city‘]
        print(self.ip+‘:\t‘+country+‘\t‘+city)
    def run(self):
        self.check_ip()

if __name__ == "__main__":
    tt = []
    ips = [‘172.25.254.23‘, ‘111.213.215.66‘, ‘152.158.32.54‘, ‘164.52.196.89‘,‘214.63.145.189‘]
    for ip in ips:
        t = Job(ip)
        t.start()
        tt.append(t)
    [i.join() for i in tt]

結果:

172.25.254.23:  XX  內網IP
214.63.145.189: 美國  XX
164.52.196.89:  印度  XX
111.213.215.66: 中國  上海
152.158.32.54:  歐洲  XX

6.生產者消費者模型

1.模型引入

1). 理論上多線程執行任務, 會產生一些數據, 為其他程序執行作鋪墊;
2). 多線程是不能返回任務執行結果的, 因此需要一個容器來存儲多線程產生的數據
3). 這個容器如何選擇? list(棧, 隊列), tuple(x), set(x), dict(x), 此處選擇隊列來實現
隊列與多線程

import threading
from queue import Queue

def job(l,queue):
    # 將任務的結果存儲到隊列中
    queue.put(sum(l))

def use_thread():
    # 實例化一個隊列, 用來存儲每個線程執行的結果
    q = Queue()
    li = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]
    threads = []
    for i in li:
        t = threading.Thread(target=job,args=(i,q))
        threads.append(t)
        t.start()
    # join方法等待所有子線程執行結束
    [i.join() for i in threads]
    # 從隊列裏面拿出所有的運行結果
    result = [q.get() for i in li]
    print(result)

if __name__ == "__main__":
    use_thread()

運行結果:

[21, 27, 33, 39]

2.生產者消費者模型

在軟件開發的過程中,經常碰到這樣的場景:
某些模塊負責生產數據,這些數據由其他模塊來負責處理(此處的模塊可能是:函數、線程、進程等)。產生數據的模塊稱為生產者,而處理數據的模塊稱為消費者。在生產者與消費者之間的緩沖區稱之為倉庫。生產者負責往倉庫運輸商品,而消費者負責從倉庫裏取出商品,這就構成了生產者消費者模式。
為了容易理解,我們舉一個寄信的例子。假設你要寄一封信,大致過程如下:
1、你把信寫好——相當於生產者生產數據
2、你把信放入郵箱——相當於生產者把數據放入緩沖區
3、郵遞員把信從郵箱取出,做相應處理——相當於消費者把數據取出緩沖區,處理數據
生產者消費者模式的優點
1.解耦
假設生產者和消費者分別是兩個線程。如果讓生產者直接調用消費者的某個方法,那麽生產者對於消費者就會產生依賴(也就是耦合)。如果未來消費者的代碼發生變化,可能會影響到生產者的代碼。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。
舉個例子:我們去郵局投遞信件,如果不使用郵箱(也就是緩沖區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他。這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼)。而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩沖區之間的弱耦合)。
2.並發
由於生產者與消費者是兩個獨立的並發體,他們之間是用緩沖區通信的,生產者只需要往緩沖區裏丟數據,就可以繼續生產下一個數據,而消費者只需要從緩沖區拿數據即可,這樣就不會因為彼此的處理速度而發生阻塞。
繼續上面的例子:如果我們不使用郵箱,就得在郵局等郵遞員,直到他回來,把信件交給他,這期間我們啥事兒都不能幹(也就是生產者阻塞)。或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。
3.支持忙閑不均
當生產者制造數據快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中,慢慢處理掉。而不至於因為消費者的性能造成數據丟失或影響生產者生產。
我們再拿寄信的例子:假設郵遞員一次只能帶走1000封信,萬一碰上情人節(或是聖誕節)送賀卡,需要寄出去的信超過了1000封,這時候郵箱這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走。

實例:
1.文件ipfile.txt中有大量的ip地址,要求將ip地址取出來再與端口號組合,放入隊列中
2.從隊列中取出地址,依次訪問並返回訪問結果

import  threading
from queue import Queue
from urllib.request import urlopen
import time

class Procuder(threading.Thread):
    def __init__(self, q):
        super(Procuder, self).__init__()
        self.q = q
    def run(self):
        portlist = [80, 443, 7001, 8000, 8080]
        with open(‘ipfile.txt‘) as f:
            for ip in f:
                for port in portlist:
                    url = ‘http://%s:%s‘ % (ip.strip(), port)
                    self.q.put(url)

class Consumer(threading.Thread):
    def __init__(self, q):
        super(Consumer, self).__init__()
        self.q = q
    def run(self):
        # 阻塞0.001妙使生產者先運行再隊列中放入數據
        time.sleep(0.001)
        #只要隊列不為空就一直“消費”數據
        while not self.q.empty():
            try:
                url = self.q.get()
                urlObj = urlopen(url)
            except Exception as e:
                print("%s unknown url" %(url))
            else:
                print("%s is ok" %(url))

if __name__ == ‘__main__‘:
    q = Queue(20)
    p1 = Procuder(q)
    p1.start()
    c = Consumer(q)
    c.start()
    # 阻塞調用線程,直到隊列中的所有任務被處理掉,再繼續向下執行。
    q.join()

運行結果就不截圖了。

7.線程池

傳統多線程方案會使用“即時創建, 即時銷毀”的策略。盡管與創建進程相比,創建線程的時間已經大大的縮短,但是如果提交給線程的任務是執行時間較短,而且執行次數極其頻繁,那麽服務器將處於不停的創建線程,銷毀線程的狀態。

一個線程的運行時間可以分為3部分:線程的啟動時間、線程體的運行時間和線程的銷毀時間。在多線程處理的情景中,如果線程不能被重用,就意味著每次創建都需要經過啟動、銷毀和運行3個過程。這必然會增加系統相應的時間,降低了效率。

使用線程池:
由於線程預先被創建並放入線程池中,同時處理完當前任務之後並不銷毀而是被安排處理下一個任務,因此能夠避免多次創建線程,從而節省線程創建和銷毀的開銷,能帶來更好的性能和系統穩定性。

#導入模塊 註意: python3.2版本以後才可以使用;
from concurrent.futures import ThreadPoolExecutor
import time

#需要完成的任務
def job(n):
    sum = 0
    for i in range(1,n+1):
        sum += i
    return sum

if __name__ =="__main__":
    #實例化線程池對象,設置線程池有10個線程
    pool = ThreadPoolExecutor(max_workers=10)

    #向線程池提交任務submit方法返回一個_base.Future對象,這個對象含有許多方法。便於我們對線程操作
    f1 = pool.submit(job,20)
    f2 = pool.submit(job,16)

    #查看線程是否完成任務(線程是否被銷毀,完成任務的線程會被釋放)
    #這裏休眠1妙是因為線程在完成工作後會被釋放,如果立即查看線程狀態,可能線程正在釋放中,會返False,這裏等待1妙讓線程完成釋放之後在查看線程狀態。
    time.sleep(1)
    print(f1.done())
    print(f2.done())

    #直接獲取任務執行結果
    print(f1.result())
    print(f2.result())

運行結果:

True
True
210
136

實現線程池的三種方法(實際可以看成2種)

concurrent.futures.ThreadPoolExecutor,在提交任務的時候,有兩種方式,一種是submit()函數,另一種是map()函數,兩者的主要區別在於:
(1)map可以保證輸出的順序, submit輸出的順序是亂的
(2)如果你要提交的任務的函數是一樣的,就可以簡化成map。但是假如提交的任務函數是不一樣的,或者執行的過程之可能出現異常(使用map執行過程中發現問題會直接拋出錯誤)就要用到submit()
(3)submit和map的參數是不同的,submit每次都需要提交一個目標函數和對應的參數,map只需要提交一次目標函數,目標函數的參數放在一個叠代器(列表,字典)裏就可以。

from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from urllib.request import urlopen

urls = [‘http://a.cn‘,‘http://1688.cn‘,‘http://jd.cn‘,‘http://qq.cn‘,‘http://qq.com‘,‘http://111.231.215.66‘]*10

def get_page(url):
    try:
        content = urlopen(url).read()
    except:
        return {‘url:‘+url+‘   page_len:‘+str(0)}
    else:
        return {‘url:‘+url+‘   page_len:‘+str(len(content))}

# 1.通過for循環打印結果
print(‘第一種方法:‘)
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in resultlist:
    print(i.result())

# 2.通過方法as_completed
print(‘第二種方法:‘)
pool = ThreadPoolExecutor(max_workers=10)
resultlist = [pool.submit(get_page,url) for url in urls]
for i in as_completed(resultlist):
    print(i.result())

# 3.通過map方法
print(‘第三種方法:‘)
pool = ThreadPoolExecutor(max_workers=10)
for res in pool.map(get_page,urls):
    print(res)

Python學習—python中的線程