python實現並發學習筆記
阿新 • • 發佈:2018-11-10
斷線 繼承 多核 code 數量 讀取 getpid adp elf
多進程
- 進程:正在進行的過程或者說是一個任務,而負責執行任務則是cpu
- 同一個程序執行兩次是兩次進程
- 並發:
- 並行:基於多核cpu
unix開子進程的拷貝一份父進程的數據
進行的三個狀態:運行,阻塞,就緒
在python中如何開啟子進程
- multiprocessing模塊中的process類
# 方式1 from multiprocessing import Process import time def task(name): print(‘%s is running‘%name) time.sleep(5) print(‘%s is done‘%name) if __name__ ==‘__main__‘: p = Process(target=task,args=(‘子進程1‘,)) p.start() #僅僅只是給操作系統發送了一個信號
#方式二:自定義類繼承自process from multiprocessing import Process import time class MyProcess(Process): def __init__(self,name): super().__init__() self.name = name def run(self): #函數名必須是run print(‘%s is running‘%self.name) time.sleep(5) print(‘%s is done‘%self.name) if __name__ == ‘__main__‘: p = MyProcess(‘子進程1‘) p.start()#本質就是在調用p.run print(‘進‘)
查看進程的pid與ppid(進程id)
- 補充:
- windows查看正在執行的進程:tasklist | findstr pycharm
- Mac os查看正在執行的進程:ps aus|grep pycharm
from multiprocessing import Process import time import os def task(name): print(‘%s is running,parent id is <%s>‘%(os.getpid(),os.getppid())) time.sleep(5) print(‘%s is done‘%os.getpid()) if __name__ ==‘__main__‘: p = Process(target=task,args=(‘子進程1‘,)) p.start() #僅僅只是給操作系統發送了一個信號 print(‘主‘,os.getpid())
process對象的其他屬性或方法
- join()等子進程結束完畢才會繼續運行主進程,主進程結束後所有的僵屍進程結束
- start()只是向操作系統發送信號,並不只是把進程立馬開起來,如果連續有幾個start有可能執行的先後順序會錯亂,如果start方法後面立馬接一個join,多個子進程會變成串行
- is_alive()查看子進程時候已經結束
- terminate()殺死這個子進程
- pid()查看進程id
- name()查看這個對象的名字
守護進程
- 當主進程執行結束子進程跟著結束:將daemon屬性設置為true,守護進程不能有子進程。代碼執行到程序最後一行代表程序運行結束
互斥鎖
- 把並發變成串行:使用multiprocessing模塊下的Lock對象
#犧牲效率實現子進程串行
from multiprocessing import Process,Lock
import time
def tack(name,lock):
lock.acquire()#加鎖
print(‘%s,1‘%name)
time.sleep(1)
print(‘%s,2‘%name)
time.sleep(1)
print(‘%s,3‘%name)
lock.release()#釋放鎖
if __name__ == ‘__main__‘:
lock = Lock()#實例化鎖對象
for i in range(3):
p = Process(target=tack,args=(‘進程%s‘%i,lock))#把鎖傳到子進程中
p.start()
互斥鎖與join的區別:join確實能實現代碼串行,但join把整個代碼變成串行,但互斥鎖可以把部分代碼變成串行
隊列
multiprocessing有提供基於ipc通信類
from multiprocessing import Queue
q = Queue(4)#指定隊列大小,如果不指定大小則大小為無窮盡
q.put(‘hello‘)#插入數據到隊列中
print(q.full())#判斷隊列中數據是否滿了
q.get()#從隊列讀取並刪除一個數據
print(q.empty())#判斷隊列中數據是否空了
- 生產者消費這模型:
- 生產者:生產數據任務
- 消費者:消費數據任務
from multiprocessing import Process,Queue
import time
def producer(q):
for i in range(3):
res = ‘包子%s‘%i
time.sleep(0.5)
q.put(res)
print(‘生產者生產了%s‘%res)
def consumer(q):
while True:
res = q.get()
if res == None:break
time.sleep(0.7)
print(‘消費者吃了%s‘%res)
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer,args=(q,))
c1 = Process(target=consumer,args=(q,))
p1.start()
c1.start()
p1.join()
c1.join()
q.put(None)
- JoinableQueue:與Queu使用一樣,只不過可以使用join方法
多線程
- 線程之間的數據是共享的
如何開啟線程
- threading模塊(使用方法與multiprose一樣)
import time
import random
from threading import Thread
def run(name):
print(‘%s is running‘%name)
time.sleep(random.randrange(1,5))
print(‘%s is end‘%name)
if __name__ == ‘__main__‘:
t1 = Thread(target=run,args=(‘喵‘,))
t1.start()
print(‘主線程‘)
import time
from threading import Thread
class MyThread(Thread):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print(‘%s is running‘%self.name)
time.sleep(2)
print(‘%s is end‘%self.name)
if __name__ == ‘__main__‘:
t1 = MyThread(‘喵‘)
t1.start()
print(‘主線程‘)
thread對象的其他屬性與方法
from threading import Thread,currentThread,active_count,enumerate
import time
def task():
print(‘%s is running‘%currentThread().getName())
time.sleep(2)
print(‘%s is done‘%currentThread().getName())
if __name__ == ‘__main__‘:
t = Thread(target=task,name=‘子線程‘)
t.start()
#t.setName(‘兒子進程1‘) 設置線程名字
print(t.isAlive())#判斷線程是否存活
print(‘主線程‘,currentThread().getName())
# t.join()#等當前線程結束才繼續執行主線程
print(active_count()) #活躍的線程數
print(enumerate())#[<_MainThread(MainThread, started 140735697388416)>, <Thread(子線程, started 123145519529984)>]
守護線程
- 在一個進程內,只有一個線程,線程運行結束,代表這個一個進程結束。
- 在一個進程內,開多個線程,主線程在代碼運行完畢,還需要等待其他線程幹完活才會結束
互斥鎖
- 將並行編程串行,犧牲效率保證數據安全,與進程的互斥鎖一樣使用
GIL全局解釋器鎖
死鎖與遞歸鎖
- 互斥鎖只能acquire一次
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print(‘%s 拿到了A鎖‘%self.name)
mutexB.acquire()
print(‘%s 拿到了B鎖‘%self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print(‘%s 拿到了B鎖‘ % self.name)
time.sleep(0.1)
mutexA.acquire()
print(‘%s 拿到了A鎖‘ % self.name)
mutexA.release()
mutexB.release()
for i in range(10):
t = MyThread()
t.start()
- 遞歸鎖:可以acquire多次,每次acquire一次計數器+1,只要計數為0,才能被其他線程搶到
# 遞歸鎖
from threading import Thread,RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print(‘%s 拿到了A鎖‘%self.name)
mutexB.acquire()
print(‘%s 拿到了B鎖‘%self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print(‘%s 拿到了B鎖‘ % self.name)
time.sleep(0.1)
mutexA.acquire()
print(‘%s 拿到了A鎖‘ % self.name)
mutexA.release()
mutexB.release()
for i in range(10):
t = MyThread()
t.start()
信號量:可以同時運行多個線程
from threading import Thread,Semaphore,currentThread
import time,random
sm = Semaphore(3)
def task():
with sm:
print(‘%s is run‘%currentThread().getName())
time.sleep(random.randint(1,3))
if __name__ == ‘__main__‘:
for i in range(10):
t = Thread(target=task)
t.start()
Event事件:實現線程同步
- event.wait()#等待(可設置等待時間)
- event.set()#開始
- event.is_set()
from threading import Thread,Event
import time
event = Event()
def student(name):
print(‘學生 %s正在聽課‘%name)
event.wait()
print(‘學生%s下課了‘%name)
def teacher(name):
print(‘老師%s 正在上課‘%name)
time.sleep(7)
print(‘老師%s 讓學生下課了‘%name)
event.set()
if __name__ == ‘__main__‘:
s1 = Thread(target=student,args=(‘wualin‘,))
s2 = Thread(target=student,args=(‘wxx‘,))
s3 = Thread(target=student,args=(‘zxx‘,))
t1 = Thread(target=teacher,args=(‘egon‘,))
s1.start()
s2.start()
s3.start()
t1.start()
定時器
線程queue
- 先進先出
import queue
q = queue.Queue(3) #先進先出->隊列
q.put(5)
q.put(‘miao‘)
q.put(‘sta‘)
print(q.get())
print(q.get())
print(q.get())
#get 和 put可設置是否阻塞以及阻塞時間
print(q.get(block=True,timeout=3))
- 後進先出
q = queue.LifoQueue(3)#後進先出->堆棧
q.put(‘fisrt‘)
q.put(2)
q.put(‘miao‘)
print(q.get())
print(q.get())
print(q.get())
- 優先級隊列
import queue
q = queue.PriorityQueue(3)#優先級隊列
q.put((10,‘one‘))
q.put((40,‘two‘))
q.put((20,‘three‘))
#數字越小優先級越高
print(q.get())
print(q.get())
print(q.get())
進程池與線程池
- 池對線程或進程數量進行一個限制
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(name):
print(‘name:%s pid:%s run‘%(name,os.getpid()))
time.sleep(random.randint(1,3))
if __name__ == ‘__main__‘:
pool = ProcessPoolExecutor(4)#進程池
for i in range(10):
pool.submit(task,‘egon%s‘%i)#異步調用
pool.shutdown()#把往進程池提交任務的入口關閉
print(‘主‘)
異步調用與回調機制
- 提交任務的兩種方式
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果再執行下一行代碼,程序是串行執行
異步調用
from concurrent.futures import ThreadPoolExecutor
import time,random
def eat(name):
print(‘%s is eating‘%name)
time.sleep(random.randint(2,5))
res = random.randint(5,10)*‘#‘
return {‘name‘:name,‘size‘:res}
def count(weith):
weith = weith.result()#
name = weith[‘name‘]
size = len(weith[‘size‘])
print(‘name:%s eat is %s‘%(name,size))
if __name__ == ‘__main__‘:
pool = ThreadPoolExecutor(5)
pool.submit(eat,‘miao‘).add_done_callback(count)#回調機制
pool.submit(eat,‘car‘).add_done_callback(count)
pool.submit(eat,‘dog‘).add_done_callback(count)
pool.submit(eat,‘zhang‘).add_done_callback(count)
協程
- 單線程下實現並發,在應用程序層次實現並發效果
greenlet模塊
- 在多個任務之間自由切換,但是不能檢測io,比yield封裝程度更高
from greenlet import greenlet
def eat(name):
print(‘%s eat 1‘%name)
g2.switch(‘miao‘)
print(‘%s eat 2‘%name)
g2.switch(‘miao‘)
def play(name):
print(‘%s play 1‘%name)
g1.switch()
print(‘%s play 2‘%name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch(‘miao‘)
>>>miao eat 1
>>>miao play 1
>>>miao eat 2
>>>miao play 2
gevent
import gevent
import time
from gevent import monkey
monkey.patch_all()#給IO打補丁,碰到IO就切換任務
def eat(name):
print(‘%s eat 1‘%name)
gevent.sleep(3)
print(‘%s eat 2‘%name)
def play(name):
print(‘%s play 1‘%name)
gevent.sleep(4)
print(‘%s play 2‘%name)
g1 = gevent.spawn(eat,‘miao‘)
g2 = gevent.spawn(play,‘alex‘)
#遇到io切換任務
g1.join()#異步提交任務,保證所有進程執行完才結束
g2.join()
gevent.joinall([g1,g2])#和上面的join一樣
I/O模型
python實現並發學習筆記