python並發編程補充
1、信號量
互斥鎖:同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據
如果指定信號量為3,那麽來一個人獲得一把鎖,計數加1,當計數等於3時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖,信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念。
from multiprocessing import Process,Semaphore import time,random def action(sem,user): sem.acquire() print('%s 占一個位置' %user) time.sleep(random.randint(0,3)) #模擬進程執行時間 sem.release() if __name__ == '__main__': sem=Semaphore(5) p_l=[] #存放啟動的進程 for i in range(13): p=Process(target=action,args=(sem,'user%s' %i,)) p.start() p_l.append(p) for i in p_l: i.join() #保證所以子進程執行完畢 print('============》')
2、事件
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麽當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麽event.wait 方法時便不再阻塞。
clear:將“Flag”設置為False
set:將“Flag”設置為True
3、進程池
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,並行操作可以節約大量的時間。多進程是實現並發的手段之一,需要註意的問題是:
(1)很明顯需要並發執行的任務通常要遠大於核數
(2)一個操作系統不可能無限開啟進程,通常有幾個核就開幾個進程
(3)進程開啟過多,效率反而會下降(開啟進程是需要占用系統資源的,而且開啟多余核數目的進程也無法做到並行)
我們就可以通過維護一個進程池來控制進程數目
對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務,不會開啟其他進程.
4、使用進程池維護固定數目的進程
# 開啟6個客戶端,會發現2個客戶端處於等待狀態
# 服務端
from socket import * from multiprocessing import Pool import os server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): print('進程pid: %s' %os.getpid()) while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p=Pool() #Pool內的進程數默認是cpu核數,假設為4(查看方法os.cpu_count()) while True: conn,client_addr=server.accept() p.apply_async(talk,args=(conn,client_addr)) # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問
# 客戶端
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
5、回調函數
需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數
我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
# 爬蟲案例 from multiprocessing import Pool import time,random import requests import re def get_page(url,pattern): response=requests.get(url) if response.status_code == 200: return (response.text,pattern) def parse_page(info): page_content,pattern=info res=re.findall(pattern,page_content) for item in res: dic={ 'index':item[0], 'title':item[1], 'actor':item[2].strip()[3:], 'time':item[3][5:], 'score':item[4]+item[5] } print(dic) if __name__ == '__main__': pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S) url_dic={ 'http://maoyan.com/board/7':pattern1, } p=Pool() res_l=[] for url,pattern in url_dic.items(): res=p.apply_async(get_page,args=(url,pattern),callback=parse_page) res_l.append(res) for i in res_l: i.get() # res=requests.get('http://maoyan.com/board/7') # print(re.findall(pattern,res.text)) '''結果: {'index': '1', 'title': '神秘巨星', 'actor': '阿米爾·汗,塞伊拉·沃西,梅·維賈', 'time': '2018-01-19', 'score': '9.5'} {'index': '2', 'title': '奇跡男孩', 'actor': '雅各布·特瑞布雷,朱莉婭·羅伯茨,歐文·威爾遜', 'time': '2018-01-19', 'score': '9.3'} {'index': '3', 'title': '小狗奶瓶', 'actor': '奶瓶,康瀟諾,魏子涵', 'time': '2018-02-02', 'score': '9.3'} {'index': '4', 'title': '公牛歷險記', 'actor': '約翰·塞納,莉莉·戴,凱特·邁克金農', 'time': '2018-01-19', 'score': '9.2'} {'index': '5', 'title': '前任3:再見前任', 'actor': '韓庚,鄭愷,於文文', 'time': '2017-12-29', 'score': '9.2'} {'index': '6', 'title': '一個人的課堂', 'actor': '孫海英,韓三明,王乃訓', 'time': '2018-01-16', 'score': '9.2'} {'index': '7', 'title': '芳華', 'actor': '黃軒,苗苗,鐘楚曦', 'time': '2017-12-15', 'score': '9.1'} {'index': '8', 'title': '南極之戀', 'actor': '趙又廷,楊子姍', 'time': '2018-02-01', 'score': '9.0'} {'index': '9', 'title': '馬戲之王', 'actor': '休·傑克曼,紮克·埃夫隆,米歇爾·威廉姆斯', 'time': '2018-02-01', 'score': '9.0'} {'index': '10', 'title': '小馬寶莉大電影', 'actor': '奧卓·阿杜巴,艾米莉·布朗特,克裏斯汀·肯諾恩斯', 'time': '2018-02-02', 'score': '8.9'} '''
6、如果在主進程中等待進程池中所有任務都執行完畢後,再統一處理結果,則無需回調函數
from multiprocessing import Pool import time,random,os def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待進程池中所有進程執行完畢 nums=[] for res in res_l: nums.append(res.get()) #拿到所有結果 print(nums) #主進程拿到所有的處理結果,可以在主進程中進行統一進行處理
二、通信線程
1、死鎖現象與遞歸鎖
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程
解決方法,遞歸鎖,在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。
from threading import Thread,RLock import time mutexA=mutexB=RLock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start()
2、信號量Semaphore
同進程的一樣
Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。
from threading import Thread,Semaphore,current_thread import threading import time,random def func(): with sm: print('%s get sm' %current_thread().getName()) time.sleep(random.randint(1,3)) if __name__ == '__main__': sm=Semaphore(5) for i in range(20): t=Thread(target=func) t.start()
3、Event
同進程的一樣
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。
所以我們需要使用threading庫中的Event對象。對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在初始情況下,Event對象中的信號標誌被
為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,
它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行。
event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度;
event.clear():恢復event的狀態值為False。
# 模擬連接mysql from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: #超過次數就拋出異常鏈接超時 raise TimeoutError('鏈接超時') print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count)) event.wait(1) #等待1秒後接著嘗試連接 count+=1 print('<%s>鏈接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
4、定時器
# 驗證碼定時器
from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res='' for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('驗證成功',end='\n') self.t.cancel() break if __name__ == '__main__': obj=Code() obj.check()
5、線程queue
(1)先進先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get()) #first
print(q.get()) #second
print(q.get()) #third
(2)堆棧(後進先出)
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get()) #third
print(q.get()) #second
print(q.get()) #first
(3)按照優先級取值
import queue
q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((-5,'b'))
q.put((30,'c'))
print(q.get()) #(-5, 'b')
print(q.get()) #(20, 'a')
print(q.get()) #(30, 'c')
三、Python標準模塊--concurrent.futures
1、介紹
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor: 進程池,提供異步調用
2、ProcessPoolExecutor用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) #異步提交任務 futures.append(future) executor.shutdown(True) #wait=True,等待池內所有任務執行完畢回收完資源後才繼續,wait=False,立即返回,並不會等待池內的任務執行完畢 print('+++>') for future in futures: print(future.result()) #取得結果
3、map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,10)) #map取代for循環submit的操作
4、回調函數
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果
5、同步調用和異步調用
提交任務的兩種方式:
同步調用:提交完任務後,就在原地等待,等待任務執行完畢,拿到任務的返回值,才能繼續下一行代碼,導致程序串行執行
異步調用+回調機制:提交完任務後,不在原地等待,任務一旦執行完畢就會觸發回調函數的執行, 程序是並發執行
#同步調用示例:
from multiprocessing import Pool from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os def task(n): print('%s is ruuning' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 def handle(res): print('handle res %s' %res) if __name__ == '__main__': pool=ProcessPoolExecutor(2) for i in range(5): res=pool.submit(task,i).result() handle(res) pool.shutdown(wait=True) # pool.submit(task,33333) print('主')
# 異步調用示例:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,random,os def task(n): print('%s is ruuning' %os.getpid()) time.sleep(random.randint(1,3)) # res=n**2 # handle(res) return n**2 def handle(res): res=res.result() print('handle res %s' %res) if __name__ == '__main__': pool=ProcessPoolExecutor(2) for i in range(5): obj=pool.submit(task,i) obj.add_done_callback(handle) pool.shutdown(wait=True) print('主')
四、協程
1、原理
基於單線程來實現並發,即只用一個主線程(很明顯可利用的cpu只有一個)情況下實現並發,這就要用到協程
對於單線程下,我們不可避免程序中出現io操作,但如果我們能在自己的程序中控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度地隱藏起來,從而可以迷惑操作系統,讓其以為該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給我們的線程,提高程序的運行效率。
協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。
所以需要同時滿足以下條件的解決方案:
(1)可以控制多個任務之間的切換,切換之前將任務的狀態保存下來,以便重新運行時,可以基於暫停的位置繼續執行。
(2)作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換
2、介紹
協程:是單線程下的並發,又稱微線程,纖程。協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的
對比操作系統控制線程的切換,用戶在單線程內控制協程的切換有什麽優缺點?
優點如下:
#1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
#2. 單線程內就可以實現並發的效果,最大限度地利用cpu
缺點如下:
#1. 協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程
#2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程
3、總結協程特點
(1)必須在只有一個單線程裏實現並發
(2)修改共享數據不需加鎖
(3)用戶程序裏自己保存多個控制流的上下文棧
(4)一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,就用到了gevent模塊(select機制))
4、greenlet模塊
#pip3 install greenlet #安裝greenlet模塊 from greenlet import greenlet import time def eat(name): print('%s eat 1' %name) time.sleep(100) g2.switch('wang') print('%s eat 2' %name) g2.switch() def play(name): print('%s play 1' % name) g1.switch() print('%s play 2' % name) g1=greenlet(eat) g2=greenlet(play) g1.switch('wang') 使用greenlet模塊可以非常簡單地實現多個任務的切換,當切到一個任務執行時如果遇到io,那就原地阻塞,這仍然沒有解決遇到IO自動切換來提升效率的問題
5、Gevent模塊(遇到IO阻塞時會自動切換任務)
#pip3 install gevent #安裝Gevent模塊
(1)用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5) #創建一個協程對象g1,spawn括號內第一個參數是函數名,後面可以有多個位置實參或關鍵字實參,都是傳給函數的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合作一步:gevent.joinall([g1,g2]) g1.value #拿到func1的返回值
(2)遇到IO阻塞時會自動切換任務
from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print('%s eat 1' %name) # gevent.sleep(3) time.sleep(3) print('%s eat 2' %name) def play(name): print('%s play 1' % name) # gevent.sleep(2) time.sleep(3) print('%s play 2' % name) g1=gevent.spawn(eat,'wang') g2=gevent.spawn(play,'li') # gevent.sleep(1) # g1.join() # g2.join() gevent.joinall([g1,g2]) 我們可以用threading.current_thread().getName()來查看每個g1和g2,查看的結果為DummyThread-n,即假線程
(3)爬蟲
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url)) #統計內容的長度 start_time=time.time() gevent.joinall([ gevent.spawn(get_page,'https://www.python.org/'), gevent.spawn(get_page,'https://www.yahoo.com/'), gevent.spawn(get_page,'https://github.com/'), ]) stop_time=time.time() print('run time is %s' %(stop_time-start_time))
(4)通過gevent實現單線程下的socket並發(from gevent import monkey;monkey.patch_all()一定要放到導入socket模塊之前,否則gevent無法識別socket的阻塞)
# 服務端
from gevent import monkey;monkey.patch_all() from socket import * import gevent #如果不想用money.patch_all()打補丁,可以用gevent自帶的socket # from gevent import socket # s=socket.socket() def server(server_ip,port): s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind((server_ip,port)) s.listen(5) while True: conn,addr=s.accept() gevent.spawn(talk,conn,addr) def talk(conn,addr): try: while True: res=conn.recv(1024) print('client %s:%s msg: %s' %(addr[0],addr[1],res)) conn.send(res.upper()) except Exception as e: print(e) finally: conn.close() if __name__ == '__main__': server('127.0.0.1',8080)
# 客戶端
#_*_coding:utf-8_*_ from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
# 多線程並發多個客戶端
from threading import Thread from socket import * import threading def client(server_ip,port): c=socket(AF_INET,SOCK_STREAM) #套接字對象一定要加到函數內,即局部名稱空間內,放在函數外則被所有線程共享,那麽客戶端端口永遠一樣了 c.connect((server_ip,port)) count=0 while True: c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8')) msg=c.recv(1024) print(msg.decode('utf-8')) count+=1 if __name__ == '__main__': for i in range(500): t=Thread(target=client,args=('127.0.0.1',8080)) t.start()
python並發編程補充