1. 程式人生 > >python 執行緒

python 執行緒

程序 執行緒的理解

  • 程式: /bin/firefox一個二進位制程式, 也可以包含任意可執行的程式, 是一個真實存在的實體。
  • 程序: 執行程式過程中產生一系列內容.(程序資訊儲存)
  • 執行緒: 每個程序裡面至少包含一個主執行緒, (firefox裡面有多個table)
  • 多程序: 瀏覽器,網易雲音樂以及pycharm 三個軟體只能順序執行是怎樣一種場景呢?本來在python程式設計時想要聽個古典音樂來點靈感, 結果發現,對於CPU來說, 一次只能執行一個程式, 太崩潰, 另外,假如有兩個程式A和B,程式A在執行到一半的過程中,需要讀取大量的資料輸入(I/O操作),而此時CPU只能靜靜 地等待任務A讀取完資料才能繼續執行,這樣就白白浪費了CPU資源。你是不是已經想到在程式A讀取資料的過程中,讓程式B去執行,當程式A讀取完資料之後,讓程式B暫停。這裡有一個關鍵詞:切換。
建立退出排程時間到事件發生事件等待新建就緒執行終止阻塞
  • 多執行緒:多執行緒就是指一個程序中同時有多個執行路徑(執行緒)正在執行。但在ptthon中,有一個GIL鎖:Global Interpreter Lock,任何Python執行緒執行前,必須先獲得GIL鎖。多執行緒程式設計,模型複雜,容易發生衝突,必須用鎖加以隔離,同時,又要小心死鎖的發生。 Python直譯器由於設計時有GIL全域性鎖,導致了多執行緒無法利用多核。多執行緒的併發在Python中就是一個美麗的夢。
建立執行緒及建立多執行緒
建立執行緒
import threading
import time
def job():
    print("這是一個需要執行的任務")
    # 啟用的執行緒個數
    print("當前執行緒的個數:",threading.active_count())
    # 列印當前執行緒的詳細資訊
    print("當前執行緒資訊:",threading.current_thread())
    time.sleep(5)
if __name__ == '__main__':
    job()

在這裡插入圖片描述

建立多執行緒
  • _thread模組(不常用)
import time
import _thread
def job(name):
    print("這是一個需要執行的任務")
    print(name,time.ctime())
    time.sleep(5)

if __name__ == '__main__':
  # 建立多個執行緒, 但是沒有開始執行任務;
    _thread.start_new_thread(job,('thread1',))
    _thread.start_new_thread(job, ('thread2',))
    while True:
        pass

在這裡插入圖片描述

  • threading模組 通過例項化物件實現
import threading
import time
def job(name):
    print("這是一個需要執行的任務")
    print("當前執行緒的個數:",threading.active_count())
    print("當前執行緒資訊:",threading.current_thread())
    time.sleep(5)
    print(name,time.ctime())
if __name__ == '__main__':
    job('job0')
    # 建立多個執行緒
    t1 = threading.Thread(target=job,name='job1',args=("job1-name",))
    t2 = threading.Thread(target=job, name='job2', args=("job2-name",))
    t1.start()
    t2.start()
    # 等待所有的子執行緒執行結束之後, 繼續執行主執行緒的內容;
    t1.join()
    t2.join()
    print('hello')

在這裡插入圖片描述

  • threading 通過繼承實現
import threading
import time
# 類的繼承
class Job(threading.Thread):
 # 重寫構造方法;如果執行的任務需要傳遞引數, 那將引數通過建構函式與self繫結;
    def __init__(self,jobname):
        super(Job,self).__init__()
        self.jobname=jobname
    # 將多執行緒需要執行的任務重寫到run方法中;
    def run(self):
        print("這是一個需要執行的任務")
        print("當前執行緒的個數:", threading.active_count())
        print("當前執行緒資訊:", threading.current_thread())
        time.sleep(5)
        print(self.jobname,time.ctime())
if __name__=='__main__':
    t1 = Job(jobname='job1')
    t2 = Job(jobname='job2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('Suceess')

在這裡插入圖片描述

用多執行緒和不用多執行緒效率對比
import threading
import time
from mytimeit import timeit
def music(name):
    for i in range(2):
        print("正在聽音樂%s"%(name))
        time.sleep(1)
def code(name):
    for i in range(2):
        print("正在編寫程式碼%s"%(name))
        time.sleep(2)
@timeit
def use_thread():
    t1 = threading.Thread(target=music,args=("去年夏天",))
    t2 = threading.Thread(target=code, args=("爬蟲",))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
@timeit
def not_use_thread():
    music("去年夏天")
    code("爬蟲")
if __name__=='__main__':
    use_thread()
    not_use_thread()

在這裡插入圖片描述

多執行緒的join方法

join方法: 會等待, 直到t1執行緒執行結束;阻塞正在呼叫的執行緒

def job(name):
    time.sleep(1)
    print(name)
t1 = threading.Thread(target=job,args=('job1',))
t1.start()
t1.join()   # 會等待, 直到t1執行緒執行結束;阻塞正在呼叫的執行緒
t2 = threading.Thread(target=job,args=('job2',))
t2.start()
t2.join()   # 會等待, 直到t2執行緒執行結束;阻塞正在呼叫的執行緒
print("main thread end")
守護執行緒

將t1執行緒生命為守護執行緒, 如果設定為True, 子執行緒啟動, 當主執行緒執行結束, 子執行緒也結束 t1.setDaemon(True)

# 當主執行緒執行結束, 讓沒有執行的執行緒強制結束;set_daemon
import threading
import time
def music(name):
    for i in range(2):
        print("正在聽音樂%s"%(name))
        time.sleep(1)
def code(name):
    for i in range(2):
        print("正在編寫程式碼%s"%(name))
        time.sleep(2)
if __name__=='__main__':
    start_time=time.time()
    t1 = threading.Thread(target=music,args=('去年夏天',))
    t2 = threading.Thread(target=code,args=('爬蟲',))
     # 將t1執行緒生命為守護執行緒, 如果設定為True, 子執行緒啟動, 當主執行緒執行結束, 子執行緒也結束
    # 設定setDaemon必須在啟動執行緒之前進行設定;
    t1.setDaemon(True)
    t2.setDaemon(True)
    t1.start()
    t2.start()
    print('花費時間:%s'%(time.time()-start_time))

在這裡插入圖片描述

執行緒鎖

為什麼會需要執行緒鎖? 多個執行緒對同一個資料進行修改時, 肯能出現不可預料的情況.

import threading
def add(lock):
 #  操作變數之前進行加鎖
    lock.acquire()
    global money
    for i in range(1000000):
        money += 1
 # 操作變數完成後進行解鎖
    lock.release()
def reduce(lock):
 # 操作變數之前進行加鎖
    lock.acquire()
    global money
    for i in range(1000000):
        money -= 1
 # 操作變數完成後進行解鎖
    lock.release()
if __name__=='__main__':
    money=0
    # 例項化一個鎖物件
    lock = threading.Lock()
    t1 = threading.Thread(target=add,args=(lock,))
    t2 = threading.Thread(target=reduce, args=(lock,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("最終金額為:%s" %(money))

若不枷鎖: 在這裡插入圖片描述 枷鎖: 在這裡插入圖片描述

GIL全域性直譯器

python使用多執行緒 , 是個好主意麼? 為什麼? - GIL(全域性直譯器鎖) - python直譯器預設每次只允許一個執行緒執行 執行過程: 1). 設定GIL 2). 切換到執行緒去執行對應的任務; 3). 執行 - 執行完了 - time.sleep() - 獲取其他資訊才能繼續執行, eg: 從網路上獲取網頁資訊等; 3. 把執行緒設定為睡眠狀態 4. 解鎖GIL 5.再次重複執行上述內容; python直譯器:Cpython直譯器, Jpython直譯器, p-python直譯器

方法的選擇: Python並不支援真正意義上的多執行緒。Python中提供了多執行緒包,但是如果你想通過多執行緒提高程式碼的速度, 使用多執行緒包並不是個好主意。Python中有一個被稱為Global Interpreter Lock(GIL)的東西, 它會確保任何時候你的多個執行緒中,只有一個被執行。執行緒的執行速度非常之快,會讓你誤以為執行緒是並行執行的, 但是實際上都是輪流執行。經過GIL這一道關卡處理,會增加執行的開銷。這意味著,如果你想提高程式碼的執行速度, 使用threading包並不是一個很好的方法。 # I/O密集型操作: 多執行緒操作 # CPU/計算密集型:多程序操作

import threading
from mytimeit import timeit
def job(l):
    sum(l)
@timeit
def use_thread():
    li = range(1,10000)
    for i in range(5):
        t = threading.Thread(target=job,args=(li,))
        t.start()
@timeit
def use_no_thread():
    li = range(1,10000)
    job(li)
if __name__=='__main__':
    use_thread()
    use_no_thread()

在這裡插入圖片描述

多執行緒資料的儲存與傳送 -----佇列

1). 理論上多執行緒執行任務, 會產生一些資料, 為其他程式執行作鋪墊; 2). 多執行緒是不能返回任務執行結果的, 因此需要一個容器來儲存多執行緒產生的資料 3). 這個容器如何選擇? list(棧, 佇列)

import threading
from queue import Queue
def job(l,queue):
 # 將任務的結果儲存到佇列中;
    queue.put(sum(l))
def use_thread():
 # 例項化一個佇列, 用來儲存每個執行緒執行的結果;
    q =Queue()
    li= [[1,2,3,4,5], [2,3,4,5,6], [2,3,4,5,6,7,8], [2,3,4,5,6]]
    threads = []
    for i in li:
        t = threading.Thread(target=job,args=(i,q))
        threads.append(t)
        t.start()
     # join方法等待所有子執行緒之心結束
    [thread.join() for thread in threads]
    # 從佇列裡面拿出所有的執行結果
    result = [q.get() for _ in li]
    print(result)
if __name__=="__main__":
    use_thread()

在這裡插入圖片描述

生產者消費者模型

給定200個ip地址, 可能開放埠為80, 443, 7001, 7002, 8000, 8080, 9000(flask), 9001 以http://ip:port形式訪問頁面以判斷是否正常訪問. 1). 任務1:構建所有的url地址;===儲存到一個數據結構中 2). 任務2:依次判斷url址是否可以成功訪問 用列表儲存:

import threading
from urllib.request import urlopen
def creat_data():
    with open('ips.txt','w') as f:
        for i in range(200):
            f.write('172.25.254.%d\n'%(i+1))
def creat_url():
    portlist = [80,443,7001,7002,8000,8080]
    with open('ips.txt') as f:
        ips = [ip.strip()for ip in f]
    urls = ['http://%s:%s'%(ip,port) for ip in ips for port in portlist]
    return urls
def job(url):
    try:
        urlobj=urlopen(url)
    except Exception as e:
        print("%s unknown url"%(url))
    else:
        print("%s is ok"%(url))
if __name__=='__main__':
    creat_data()
    urls = creat_url()
    threads = []
    for url in urls:
        t = threading.Thread(target=job,args=(url,))
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]

在這裡插入圖片描述 用佇列:

import  threading
from queue import Queue
from urllib.request import urlopen
class Producer(threading.Thread):
    def __init__(self, queue):
        super(Producer, self).__init__()
        self.queue = queue
    def run(self):
        portlist = [80, 443, 7001, 7002, 8000, 8080]
        with open('ips.txt') as f:
            ips = [ip.strip() for ip in f]
        # 每生產一個url地址, 就將生產的資料放到佇列裡面;
        for ip in ips:
            for port in portlist:
                url = 'http://%s:%s' % (ip, port)
                self.queue.put(url)
        # urls = ['http://%s:%s' % (ip, port) for ip in ips for port in portlist]
        # return urls
class Consumer(threading.Thread):
    def __init__(self, queue):
        super(Consumer, self).__init__()
        self.queue = queue
    def run(self):
        try:
            url = self.queue.get()
            urlObj = urlopen(url)
        except Exception as e:
            print("%s unknown url" % (url))
        else:
            print("%s is ok" % (url))

if __name__ == '__main__':
    queue = Queue()
    # 一個執行緒物件, 生產者
    p1 = Producer(queue)
    p1.start()
    # 消費者啟動多個執行緒(啟動20個)
    for i in range(20):
        c1 = Consumer(queue)
        c1.start()

多執行緒獲取IP地址

import json
import threading
from urllib.request import urlopen
from mytimeit import timeit
def job(ip):
    #API
    url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (ip)
    # 根據url獲取網頁的內容, 並且解碼為utf-8格式, 識別中文;
    text = urlopen(url).read().decode('utf-8')
    # 將獲取的字串型別轉換為字典, 方便處理
    d = json.loads(text)['data']
    country = d['country']
    city = d['city']
    print("%s:"%(ip),country,city)
@timeit
def has_many_thread():
    threads = []
    ips = ['172.25.254.78', '8.8.8.8',
           '172.25.254.78', '8.8.8.8',
           '172.25.254.78', '8.8.8.8']
    for ip in ips:
     # 例項化執行緒物件
        t = threading.Thread(target=job,args=(ip,))
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
@timeit
def has_no_thread():
    ips = ['172.25.254.250', '8.8.8.8',
           '172.25.254.250', '8.8.8.8',
           '172.25.254.250', '8.8.8.8']
    for ip in ips:
        job(ip)
if __name__=='__main__':
    has_many_thread()
    has_no_thread()

在這裡插入圖片描述

ThreadPool執行緒池

注意: python3.2版本以後才可以使用;

from concurrent.futures import ThreadPoolExecutor
import time
def job():
    print("this is a job")
    return 'hello'
if __name__=='__main__':
# 例項化物件, 執行緒池包含10個執行緒來處理任務;
    pool = ThreadPoolExecutor(max_workers=10)
 # 往執行緒池裡面扔需要執行的任務, 返回一個物件,( _base.Future例項化出來的)
    f1 = pool.submit(job)
    f2 = pool.submit(job)
 # 判斷任務是否執行結束   
    print(f1.done())
    time.sleep(1)
    print(f2.done())
  # 獲取任務執行的結果  
    print(f1.result())
    print(f2.result())

在這裡插入圖片描述

執行緒池與map函式
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.error import HTTPError
from urllib.request import urlopen
from mytimeit import timeit
URLS = ['http://httpbin.org', 'http://example.com/',
        'https://api.github.com/'] * 10
def get_page(url,timeout=3):
    try:
        content = urlopen(url).read()
        return {'url':url,'len':len(content)}
    except HTTPError as e:
        return {'url':url,'len':0}
@timeit
def method_1():
# 方法1: submit提交任務
    pool =ThreadPoolExecutor(max_workers=20)
    futuresobj = [pool.submit(get_page,url) for url in URLS]
    # 注意: 傳遞的時包含futures物件的序列, as_complete, 返回已經執行完任務的future物件,
   # 直到所有的future對應的任務執行完成, 迴圈結束; 
    # for finish_fs in as_completed(futuresobj):
    #     print(finish_fs.result())
    for future in futuresobj:
        print(future.result())
@timeit
def method_2():
# 方法2:通過map方式執行
    pool = ThreadPoolExecutor(max_workers=20)
    for res in pool.map(get_page,URLS):
        print(res)
method_2()

在這裡插入圖片描述

多執行緒批量管理主機
# 基於ssh用於連線遠端伺服器做操作:遠端執行命令, 上傳檔案, 下載檔案
import threading
from concurrent.futures import ThreadPoolExecutor

import  paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
def connect(cmd, hostname, port=22, user='root'):
    # ssh [email protected]
    # 建立一個ssh物件;
    client = paramiko.SSHClient()

    # 返回一個私鑰物件
    private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
    # 2. 解決問題:如果之前沒有;連線過的ip, 會出現
    # Are you sure you want to continue connecting (yes/no)? yes
    # 自動選擇yes
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        # 3. 連線伺服器
        client.connect(hostname=hostname,
                       port=port,
                       username=user,
                       pkey=private_key
                      )
        # 4. 執行操作
        stdin, stdout, stderr = client.exec_command(cmd)
    except NoValidConnectionsError as e:
        print("%s連線失敗" %(hostname))
    except AuthenticationException as e:
        print("%s密碼錯誤" %(hostname))
    else:
        # 5. 獲取命令的執行結果;
        result = stdout.read().decode('utf-8')
        print("%s執行結果:" %(hostname), result)
    finally:
        # 6. 關閉連線
        client.close()
 #******方法1:例項化物件實現*********
def method_1():
    threads = []
    for count in range(254):
        host = '172.25.254.%s' % (count + 1)
        # print(host.center(50, '*'))
        t = threading.Thread(target=connect, args=('uname', host))
        threads.append(t)
        t.start()
    # join方法, 等待所有的子執行緒執行結束;
    _ = [thread.join() for thread in threads]
    print("任務執行結束........")
# ******方法2: 執行緒池只有50個執行緒處理所有的任務********
def method_2():
    # 建立執行緒池物件
    pool = ThreadPoolExecutor(max_workers=50)

    # 依次向執行緒池提交任務
    for count in range(254):
        host = '172.25.254.%s' % (count + 1)
        pool.submit(connect, 'uname', host)
method_2()

繼承的方式實現:

import threading
import paramiko
from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException
class IpThread(threading.Thread):
    def __init__(self,cmd,hostname,port=22,user='root'):
        super(IpThread, self).__init__()
        self.cmd=cmd
        self.hostname=hostname
        self.port=port
        self.user=user
    def run(self):
        client = paramiko.SSHClient()
        private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            client.connect(hostname=self.hostname,
                           port=self.port,
                           username=self.user,
                           pkey=private_key
                           )
            stdin, stdout, stderr = client.exec_command(self.cmd)
        except NoValidConnectionsError as e:
            print("%s連線失敗" % (self.hostname))
        except AuthenticationException as e:
            print("%s密碼錯誤" % (self.hostname))
        except TimeoutError as  e:
            print("%s連線超時" % (self.hostname))
        else:
            result = stdout.read().decode('utf-8')
            print('%s' % (self.hostname), result)
        finally:
            client.close()
def use_thread():
    threads = []
    ips = ['172.25.254.%s' %(i)  for i in range(1,254)]
    for ip in ips:
        t = IpThread(cmd='uname',hostname=ip)
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
if __name__ == '__main__':
    use_thread()

這裡寫圖片描述