1. 程式人生 > >python並發編程

python並發編程

per mark 滿了 getpid 作用 進行 main 任務 方法

操作系統基礎

  • 操作系統的兩大功能
    1. 封裝好硬件復雜的接口,提供良好的抽象接口,運行應用程序只需要調用這些接口即可啟動相應的硬件服務,例如啟動暴風音影
      • 雙擊執行文件
      • 獲取應用軟件在硬盤上的存儲地址
      • 操作系統會直接將硬盤上的數據讀取到內存中
      • 交給cpu運行
    2. 管理、調度進程,並且將多個進程對硬件的競爭變得有序;
  • 系統演化

    • 第二代計算機:批處理系統
      • 1701主要是負責I/O操作,批量輸入和批量輸出
      • 7094主要是負責計算, 運行程序
    • 第三代計算機: 多個聯機終端 + 多道技術(針對單核

      • 多道技術:多道指的是多個程序,多道技術的實現是為了解決多個程序競爭或者說共享同一個資源(cpu)的有序調度問題,解決方式為多路復用,復用分為時間上的復用和空間上的復用

        1. 時間上的復用:

          當一個程序在等待I/O時,另一個程序可以使用cpu,如果內存中可以存放足夠多的作業,則cpu利用率可以達到100%

          操作系統采用了多道技術後,可以控制進程的切換,或者說進程之間去爭搶cpu的執行權限。這種切換不僅會在一個進程遇到io時進行,一個進程占用cpu時間過長也會切換,或者說被操作系統奪走cpu的執行權限

        2. 空間上的復用

          程序之間的內存必須分割,這種分割需要在硬件層面實現,由操作系統控制。如果內存彼此不分割,則一個程序可以訪問另外一個程序的內存,

          首先喪失的是安全性,比如你的qq程序可以訪問操作系統的內存,這意味著你的qq可以拿到操作系統的所有權限。

          其次喪失的是穩定性,某個程序崩潰時有可能把別的程序的內存也給回收了,比方說把操作系統的內存給回收了,則操作系統崩潰。


開啟子進程

  • 方法1(基礎版本)
from multiprocessing import Process
import time


def task(name):
    print(%s starts running‘ % name)
    time.sleep(3)
    print(%s finish‘ % name)

if __name__ == ‘__main__‘:
    p = Process(target=task, kwargs={‘name‘:‘子進程1‘})
    p.start()
    print(‘主進程‘)

join方法

  • 在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢後,主進程還需要等待子進程執行完畢,然後統一回收資源。

  • join作用 :如果主進程的任務在執行到某個階段時,需要等待子進程執行完畢後才能繼續執行,就需要有一種機制能讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才能繼續執行, 否則一直在原地阻塞


查看子進程

  • 可以通過os.getpid()查看當前進程代號, os.getppid()查看父進程代號;
from multiprocessing import Process
import time
import os


def task():
    print(%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
    time.sleep(4)
    print(%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
    print("==========")

if __name__ == ‘__main__‘:
    p = Process(target=task)
    p.start()
    print(‘主進程<%s> is running, 我的父進程<%s>‘ % (os.getpid(), os.getppid()))
  • 方法2:(升級版本)
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        """方法名稱必須為run"""
        print(%s starts running‘ % self.name)
        time.sleep(3)
        print(%s finish‘ % self.name)


if __name__ == ‘__main__‘:
    p = MyProcess(‘子進程1‘)
    p.start()  # 會調用run方法
    print(‘主進程!‘)

子進程和父進程是存儲在不同的內存空間的

  • 驗證實例
from multiprocessing import Process
import time
import os

def task():
    print(%s is running, parent id <%s>‘ % (os.getpid(), os.getppid()))
    time.sleep(4)
    print(%s finish, parent id <%s>‘ % (os.getpid(), os.getppid()))
    print("==========")

if __name__ == ‘__main__‘:
    p = Process(target=task)
    p.start()
    print(‘主進程<%s> is running, 我的父進程<%s>‘ % (os.getpid(), os.getppid()))

socket多進程代碼

# server服務器
import socket
from multiprocessing import Process
import os

def run(conn):
    """開始等待,接收數據"""
    while True:
        data = conn.recv(1024)
        if not data: break
        res = ‘來自服務端<%s>的反饋:%s %(os.getpid(), data)
        conn.send(res.encode(‘utf-8‘))

def server(ip, port):
    """啟動服務器模版,並生成服務器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()
        # 開啟子進程服務,可以同時服務多個客戶端
        p = Process(target=run, args=(conn,))
        p.start()

server(‘127.0.0.1‘, 8800)
# 客戶端
import socket
import time
import time


client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((‘127.0.0.1‘, 8800))
msg = ‘hello from fqh‘
client.send(msg.encode((‘utf8‘)))

data = client.recv(1024)
print(data.decode(‘utf-8‘))
time.sleep(5)

守護進程

  • 守護進程在主進程代碼執行完之後會立即斷開
  • 設置守護進程方法:p.daemon=True, 且必須在p.start()之前設置
from multiprocessing import Process
import time

def task(name):
    print(%s starts‘ % name)
    time.sleep(2)
    print(%s ends‘ % name)

if __name__ == ‘__main__‘:
    p=Process(target=task,args=(‘egon‘,))
    # 一定要在p.start()前設置,設置p為守護進程,禁止p創建子進程,並且父進程代碼執行結束,p即終止運行
    p.daemon=True
    p.start()
    time.sleep(1)
    # 只要終端打印出這一行內容,那麽守護進程p也就跟著結束掉了
    print(‘主‘)

互斥鎖

  • 只需要將對共享數據修改的那一段代碼加上鎖就可以,其余的部分還是可以並行!

  • 模擬購票系統, 運用互鎖功能實現搶票,代碼如下:

from multiprocessing import Process, Lock
import time,json


def search(name):
    """查詢剩余的票數"""
    dic=json.load(open(‘db.txt‘))
    time.sleep(1)
    print(\033[43m%s 查到剩余票數%s\033[0m‘ %(name,dic[‘count‘]))


def get(name):
    """購票"""
    dic=json.load(open(‘db.txt‘))
    time.sleep(1) #模擬讀數據的網絡延遲
    if dic[‘count‘] >0:
        dic[‘count‘]-=1
        time.sleep(1) #模擬寫數據的網絡延遲
        json.dump(dic,open(‘db.txt‘,‘w‘))
        print(\033[46m%s 購票成功!\033[0m‘ %name)
    else:
        print(\033[46m%s 對不起,搶票失敗!\033[0m‘ %name)


def task(name, mutex):
    """購票流程"""
    search(name)
    mutex.acquire()
    get(name)
    mutex.release()

if __name__ == ‘__main__‘:
    # 主進程,模擬並發10個客戶端搶票
    mutex = Lock()
    for i in range(10):
        name=‘<路人%s>‘ % i
        p=Process(target=task,args=(name, mutex))
        p.start()

隊列Queue

  • Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

  • maxsize是隊列中允許最大項數,省略則無大小限制。但需要明確:
    1. 存放的是消息而非大數據
    2. 用的是內存空間,因而maxsize即便是無大小限制也受限於內存大小
  • q.put()方法用於插入數據,q.get()從隊列讀取並刪除一個元素

  • 在存儲數據時候, 若隊列已經滿了再存入數據就會出現進程阻塞狀態, 可以用q.full()判斷隊列是否為存滿數據

  • 同樣,在取出數據時候,若隊列為空時,取數據也會出現進程阻塞狀態, 可以用q.empty()判斷隊列是否為空狀態


生產消費者模式

  • 簡單版(單生產者及消費者模式)
from multiprocessing import Process,Queue
import time,random,os

def producer(q, name, food):
    """生產者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生產了 %s\033[0m‘ %(name,res))

def consumer(q, name):
    """消費者"""
    while True:
        res = q.get()
        if res == None:  # 判斷是否生產完畢
            print(‘序列已經清空!‘)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))

if __name__ == ‘__main__‘:
    q = Queue()  # 生成容器對象
    # 生產者們
    p1 = Process(target=producer,args=(q, ‘egon‘, ‘包子‘))
    # 消費者們
    c1 = Process(target=consumer,args=(q, ‘alex‘))
    # 開始
    p1.start()
    c1.start()
    p1.join()
    q.put(None)  # 當生產進程執行完畢後再放進None表示結束
    print(‘主進程執行結束====‘)
  • 升級版(多消費者多生產者模式)
from multiprocessing import Process,Queue
import time,random,os


def producer(q, name, food):
    """生產者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生產了 %s\033[0m‘ %(name,res))


def consumer(q, name):
    """消費者"""
    while True:
        res = q.get()
        if res == "_a_":  # 判斷是否生產完畢
            print(%s結束消費‘%name)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))


if __name__ == ‘__main__‘:
    q = Queue()  # 生成容器對象
    # 生產者們
    p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
    p2 = Process(target=producer, args=(q, ‘kate‘, ‘襪子‘))
    p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
    # 消費者們
    c1 = Process(target=consumer, args=(q, ‘alex‘))
    c2 = Process(target=consumer, args=(q, ‘frank‘))
    # 生產者開始生產
    p1.start()
    p2.start()
    p3.start()
    # 消費者開始消費
    c1.start()
    c2.start()
    # 等待生產完畢後添加結束生產標識符, 否則消費者進程會阻塞主進程,
    # 註意有幾個消費者就要添加幾個結束標識符,必須結束所有的消費進程,否則依然阻塞
    p1.join()
    p3.join()
    p2.join()
    q.put(‘_a_‘)
    q.put(‘_a_‘)
    print(‘主進程‘)

  • 最高級版本, 利用joinable_queue和守護進程
from multiprocessing import Process, JoinableQueue
import time


def producer(q, name, food):
    """生產者"""
    for i in range(3):
        time.sleep(1)
        res = %s%s %(food, i)
        q.put(res)
        print(\033[45m%s 生產了 %s\033[0m‘ %(name,res))
    q.join()  # 當q為空的時候才會繼續主進程


def consumer(q, name):
    """消費者"""
    while True:
        res = q.get()
        if res == "_a_":  # 判斷是否生產完畢
            print(%s結束消費‘%name)
            break
        time.sleep(2)
        print(\033[43m%s%s\033[0m‘ % (name, res))
        q.task_done()  # 每取走一個數據都會向生產者反饋信息


if __name__ == ‘__main__‘:
    q = JoinableQueue()  # 生成容器對象
    # 生產者們
    p1 = Process(target=producer, args=(q, ‘egon‘, ‘包子‘))
    p2 = Process(target=producer, args=(q, ‘kate‘, ‘襪子‘))
    p3 = Process(target=producer, args=(q, ‘steven‘, ‘iphone‘))
    # 消費者們
    c1 = Process(target=consumer, args=(q, ‘alex‘))
    c2 = Process(target=consumer, args=(q, ‘frank‘))
    # 設置消費者進程為守護進程,當主程序執行完畢會自動回收
    c1.daemon = True
    c2.daemon = True
    # 生產者開始生產
    p1.start()
    p2.start()
    p3.start()
    # 消費者開始消費
    c1.start()
    c2.start()
    # 等待生產者進程結束
    p1.join()
    p3.join()
    p2.join()
    # 此時隊列一定為空
    print(‘主進程‘)

python並發編程