1. 程式人生 > >python實現並發學習筆記

python實現並發學習筆記

斷線 繼承 多核 code 數量 讀取 getpid adp elf

多進程

  • 進程:正在進行的過程或者說是一個任務,而負責執行任務則是cpu
  • 同一個程序執行兩次是兩次進程
  • 並發:
  • 並行:基於多核cpu

unix開子進程的拷貝一份父進程的數據

進行的三個狀態:運行,阻塞,就緒

在python中如何開啟子進程
  1. multiprocessing模塊中的process類
# 方式1
from multiprocessing import Process
import time

def task(name):
    print(‘%s is running‘%name)
    time.sleep(5)
    print(‘%s is done‘%name)

if __name__ ==‘__main__‘:
    p = Process(target=task,args=(‘子進程1‘,))
    p.start() #僅僅只是給操作系統發送了一個信號
#方式二:自定義類繼承自process
from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self): #函數名必須是run
        print(‘%s is running‘%self.name)
        time.sleep(5)
        print(‘%s is done‘%self.name)

if __name__ == ‘__main__‘:
    p = MyProcess(‘子進程1‘)
    p.start()#本質就是在調用p.run
    print(‘進‘)
查看進程的pid與ppid(進程id)
  • 補充:
    • windows查看正在執行的進程:tasklist | findstr pycharm
    • Mac os查看正在執行的進程:ps aus|grep pycharm
from multiprocessing import Process
import time
import os


def task(name):
    print(‘%s is running,parent id is <%s>‘%(os.getpid(),os.getppid()))
    time.sleep(5)
    print(‘%s is done‘%os.getpid())

if __name__ ==‘__main__‘:
    p = Process(target=task,args=(‘子進程1‘,))
    p.start() #僅僅只是給操作系統發送了一個信號
    print(‘主‘,os.getpid())
process對象的其他屬性或方法
  • join()等子進程結束完畢才會繼續運行主進程,主進程結束後所有的僵屍進程結束
  • start()只是向操作系統發送信號,並不只是把進程立馬開起來,如果連續有幾個start有可能執行的先後順序會錯亂,如果start方法後面立馬接一個join,多個子進程會變成串行
  • is_alive()查看子進程時候已經結束
  • terminate()殺死這個子進程
  • pid()查看進程id
  • name()查看這個對象的名字
守護進程
  • 當主進程執行結束子進程跟著結束:將daemon屬性設置為true,守護進程不能有子進程。代碼執行到程序最後一行代表程序運行結束
互斥鎖
  • 把並發變成串行:使用multiprocessing模塊下的Lock對象
#犧牲效率實現子進程串行
from multiprocessing import Process,Lock
import time

def tack(name,lock):
    lock.acquire()#加鎖
    print(‘%s,1‘%name)
    time.sleep(1)
    print(‘%s,2‘%name)
    time.sleep(1)
    print(‘%s,3‘%name)
    lock.release()#釋放鎖
if __name__ == ‘__main__‘:
    lock = Lock()#實例化鎖對象
    for i in range(3):
        p = Process(target=tack,args=(‘進程%s‘%i,lock))#把鎖傳到子進程中
        p.start()
  • 互斥鎖與join的區別:join確實能實現代碼串行,但join把整個代碼變成串行,但互斥鎖可以把部分代碼變成串行

    隊列
  • multiprocessing有提供基於ipc通信類

from multiprocessing import Queue

q = Queue(4)#指定隊列大小,如果不指定大小則大小為無窮盡

q.put(‘hello‘)#插入數據到隊列中
print(q.full())#判斷隊列中數據是否滿了
q.get()#從隊列讀取並刪除一個數據
print(q.empty())#判斷隊列中數據是否空了
  • 生產者消費這模型:
    • 生產者:生產數據任務
    • 消費者:消費數據任務
from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = ‘包子%s‘%i
        time.sleep(0.5)
        q.put(res)
        print(‘生產者生產了%s‘%res)

def consumer(q):
    while True:
        res = q.get()
        if res == None:break
        time.sleep(0.7)
        print(‘消費者吃了%s‘%res)


if __name__ == ‘__main__‘:
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    p1.start()
    c1.start()
    p1.join()
    c1.join()
    q.put(None)
  • JoinableQueue:與Queu使用一樣,只不過可以使用join方法

多線程

  • 線程之間的數據是共享的
如何開啟線程
  • threading模塊(使用方法與multiprose一樣)
import time
import random
from threading import Thread

def run(name):
    print(‘%s is running‘%name)
    time.sleep(random.randrange(1,5))
    print(‘%s is end‘%name)

if __name__ == ‘__main__‘:
    t1 = Thread(target=run,args=(‘喵‘,))
    t1.start()
    print(‘主線程‘)
import time
from threading import Thread
class MyThread(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print(‘%s is running‘%self.name)
        time.sleep(2)
        print(‘%s is end‘%self.name)

if __name__ == ‘__main__‘:
    t1 = MyThread(‘喵‘)
    t1.start()
    print(‘主線程‘)
thread對象的其他屬性與方法
from threading import Thread,currentThread,active_count,enumerate

import time

def task():
    print(‘%s is running‘%currentThread().getName())
    time.sleep(2)
    print(‘%s is done‘%currentThread().getName())

if __name__ == ‘__main__‘:
    t = Thread(target=task,name=‘子線程‘)
    t.start()
    #t.setName(‘兒子進程1‘)  設置線程名字
    print(t.isAlive())#判斷線程是否存活
    print(‘主線程‘,currentThread().getName())
    # t.join()#等當前線程結束才繼續執行主線程
    print(active_count())   #活躍的線程數
    print(enumerate())#[<_MainThread(MainThread, started 140735697388416)>, <Thread(子線程, started 123145519529984)>]
守護線程
  • 在一個進程內,只有一個線程,線程運行結束,代表這個一個進程結束。
  • 在一個進程內,開多個線程,主線程在代碼運行完畢,還需要等待其他線程幹完活才會結束
互斥鎖
  • 將並行編程串行,犧牲效率保證數據安全,與進程的互斥鎖一樣使用
GIL全局解釋器鎖
死鎖與遞歸鎖
  • 互斥鎖只能acquire一次
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘%s 拿到了A鎖‘%self.name)
        mutexB.acquire()
        print(‘%s 拿到了B鎖‘%self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘%s 拿到了B鎖‘ % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print(‘%s 拿到了A鎖‘ % self.name)
        mutexA.release()
        mutexB.release()

for i in range(10):
    t = MyThread()
    t.start()
  • 遞歸鎖:可以acquire多次,每次acquire一次計數器+1,只要計數為0,才能被其他線程搶到
# 遞歸鎖
from threading import Thread,RLock
import time
mutexA = mutexB = RLock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(‘%s 拿到了A鎖‘%self.name)
        mutexB.acquire()
        print(‘%s 拿到了B鎖‘%self.name)
        mutexB.release()
        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(‘%s 拿到了B鎖‘ % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print(‘%s 拿到了A鎖‘ % self.name)
        mutexA.release()
        mutexB.release()

for i in range(10):
    t = MyThread()
    t.start()
信號量:可以同時運行多個線程
from threading import Thread,Semaphore,currentThread
import time,random

sm = Semaphore(3)
def task():
    with sm:
        print(‘%s is run‘%currentThread().getName())
        time.sleep(random.randint(1,3))

if __name__ == ‘__main__‘:
    for i in range(10):
        t = Thread(target=task)
        t.start()
Event事件:實現線程同步
  1. event.wait()#等待(可設置等待時間)
  2. event.set()#開始
  3. event.is_set()
from threading import Thread,Event
import time
event = Event()
def student(name):
    print(‘學生 %s正在聽課‘%name)
    event.wait()
    print(‘學生%s下課了‘%name)

def teacher(name):
    print(‘老師%s 正在上課‘%name)
    time.sleep(7)
    print(‘老師%s 讓學生下課了‘%name)
    event.set()

if __name__ == ‘__main__‘:
    s1 = Thread(target=student,args=(‘wualin‘,))
    s2 = Thread(target=student,args=(‘wxx‘,))
    s3 = Thread(target=student,args=(‘zxx‘,))
    t1 = Thread(target=teacher,args=(‘egon‘,))
    s1.start()
    s2.start()
    s3.start()
    t1.start()
定時器
線程queue
  • 先進先出
import queue
q = queue.Queue(3) #先進先出->隊列
q.put(5)
q.put(‘miao‘)
q.put(‘sta‘)

print(q.get())
print(q.get())
print(q.get())

#get 和 put可設置是否阻塞以及阻塞時間

print(q.get(block=True,timeout=3))
  • 後進先出
q = queue.LifoQueue(3)#後進先出->堆棧

q.put(‘fisrt‘)
q.put(2)
q.put(‘miao‘)

print(q.get())
print(q.get())
print(q.get())
  • 優先級隊列
import queue
q = queue.PriorityQueue(3)#優先級隊列
q.put((10,‘one‘))
q.put((40,‘two‘))
q.put((20,‘three‘))
#數字越小優先級越高
print(q.get())
print(q.get())
print(q.get())
進程池與線程池
  • 池對線程或進程數量進行一個限制
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random

def task(name):
    print(‘name:%s pid:%s run‘%(name,os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == ‘__main__‘:
    pool = ProcessPoolExecutor(4)#進程池
    for i in range(10):
        pool.submit(task,‘egon%s‘%i)#異步調用

    pool.shutdown()#把往進程池提交任務的入口關閉

    print(‘主‘)
異步調用與回調機制
  • 提交任務的兩種方式
    • 同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果再執行下一行代碼,程序是串行執行

    • 異步調用

    from concurrent.futures import ThreadPoolExecutor
import time,random

def eat(name):
    print(‘%s is eating‘%name)
    time.sleep(random.randint(2,5))
    res = random.randint(5,10)*‘#‘
    return {‘name‘:name,‘size‘:res}

def count(weith):
    weith = weith.result()#
    name = weith[‘name‘]
    size = len(weith[‘size‘])
    print(‘name:%s eat is %s‘%(name,size))

if __name__ == ‘__main__‘:
    pool = ThreadPoolExecutor(5)
    pool.submit(eat,‘miao‘).add_done_callback(count)#回調機制
    pool.submit(eat,‘car‘).add_done_callback(count)
    pool.submit(eat,‘dog‘).add_done_callback(count)
    pool.submit(eat,‘zhang‘).add_done_callback(count)

協程

  • 單線程下實現並發,在應用程序層次實現並發效果
greenlet模塊
  • 在多個任務之間自由切換,但是不能檢測io,比yield封裝程度更高
from greenlet import greenlet

def eat(name):
    print(‘%s eat 1‘%name)
    g2.switch(‘miao‘)
    print(‘%s eat 2‘%name)
    g2.switch(‘miao‘)


def play(name):
    print(‘%s play 1‘%name)
    g1.switch()
    print(‘%s play 2‘%name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch(‘miao‘)

>>>miao eat 1
>>>miao play 1
>>>miao eat 2
>>>miao play 2
gevent
import gevent
import time
from gevent import monkey
monkey.patch_all()#給IO打補丁,碰到IO就切換任務

def eat(name):
    print(‘%s eat 1‘%name)
    gevent.sleep(3)
    print(‘%s eat 2‘%name)

def play(name):
    print(‘%s play 1‘%name)
    gevent.sleep(4)
    print(‘%s play 2‘%name)

g1 = gevent.spawn(eat,‘miao‘)
g2 = gevent.spawn(play,‘alex‘)

#遇到io切換任務
g1.join()#異步提交任務,保證所有進程執行完才結束
g2.join()
gevent.joinall([g1,g2])#和上面的join一樣

I/O模型

python實現並發學習筆記