1. 程式人生 > 實用技巧 >11.併發程式設計之程序

11.併發程式設計之程序

目錄

一、作業系統基礎

定義:作業系統是一個協調、管理和控制計算機硬體資源和軟體資源的控制程式。

作業系統本質位於計算機硬體和軟體之間,本質也是一個軟體。作業系統由作業系統的核心(運行於核心態,管理硬體資源)以及系統呼叫(運行於使用者態,為應用程式設計師寫的應用程式提供系統呼叫介面)兩部分組成。

作業系統的發展史:

  • 第一代計算機(1940~1955):真空管和穿孔卡片
  • 第二代計算機(1955~1965):電晶體和批處理系統
  • 第三代計算機(1965~1980):積體電路晶片和多道程式設計
  • 第四代計算機(1980~至今):個人計算機

多道技術:

  • 實現併發,現在是多核,但是每個核都會用到多道技術

  • 空間上的複用:記憶體中有多道程式

  • 時間上的複用:複用一個cpu的時間片

二、多程序基礎

2.1 程序基礎

程序:就是一個過程或一個任務,與程式的區別是程式只是一些程式碼,但是程序是程式的執行過程,程式執行起來就是一個程序,一個程序執行兩次是兩個程序。程序也是計算中最小的資源分配單位。

pid:程序的唯一標識,

2.2 併發和並行

併發:偽並行,即多個程式輪流在一個cpu上執行,像多個程式在同時執行。單個cpu+多道技術可實現併發。

並行:多個程式同時執行,在多個cpu上執行。

實現:單核下,可以利用多道技術,多核,每個核也都可以利用多道技術(多道技術是針對單核而言的)有4個核,5個任務,這樣同一時間有4個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4;一旦其中一個遇到I/O被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術。

多道技術:記憶體中同時存入多道(多個)程式,cpu從一個程序快速切換到另外一個,使每個程序各自執行一段時間,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻可以執行多個程序,這就給人產生了並行的錯覺,即偽並行,以此來區分多處理器作業系統的真正硬體並行(多個cpu共享同一個實體記憶體)。

2.3 同步\非同步和阻塞\非阻塞

同步:就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不會返回。按照這個定義,其實絕大多數函式都是同步呼叫。簡單來說就是做A事的時候發起B事,必須等待B事結束。

非同步的概念和同步相對。當一個非同步功能呼叫發出後,呼叫者不能立刻得到結果。當該非同步功能完成後,通過狀態、通知或回撥來通知呼叫者。簡單來說就是做A事的時候發起B事件,不需要等待B事件結束就可以繼續A事件。

阻塞:cpu不工作,等待狀態,input,accept,recv,recvfrom,sleep,connect

非阻塞:cpu在工作

三、多程序

程序的三狀態圖

程序的排程演算法:給所有的程序分配資源或分配cpu的使用權的一種方法。

短作業優先,先來先服務,多級反饋演算法,

執行緒是程序中的一個單位,不能脫離程序存在,執行緒是計算中能被cpu排程的最小單位。

四、multiprocessing模組

4.1 建立子程序

pid --> process id

ppid --> parent peocess id

from multiprocessing import Process
import os
def func():
  	# 獲取程序id和父程序id
    print(os.getpid(),os.getppid())


if __name__ == '__main__':
  	# 只會在主程序中執行一次,這需要將只在主程序中執行的程式碼放在main下面。
    print('main', os.getpid(),os.getppid())
    p = Process(target=func) 
    p.start()       # 此處開啟了一個子程序   

在子程序中會自動import 父程序中所有的檔案,如果不加if __name__ == '__main__': ,那麼建立子程序時會自動執行了p =process(tartget=func)這會建立子程序的子程序,導致迴圈。

在Linux中會把父程序中的內容複製一份到子程序中,而不是import 父程序的程式碼,通過fork來完成的。

給子程序傳遞引數:在 p = Process(target=func) 傳遞引數,p = Process(target=func,args=('a')) 傳遞的args引數必須是一個元組。

能不能獲取子程序的返回值-->不能

可以開啟多個子程序

# 多元的處理程序的模組
from multiprocessing import Process
import os
import time

def func(name):
    time.sleep(5)
    print(os.getpid(),os.getppid(), name)

if __name__ == '__main__':
    print('main', os.getpid(),os.getppid())
    p = Process(target=func,args=('an',)) 
    p.start()       # 此處開啟了一個子程序   
    p = Process(target=func,args=('bn',)) 
    p.start()
    p = Process(target=func,args=('cn',)) 
    p.start()

父程序和子程序是同步的,必須先等父程序建立完畢之後再建立子程序,start()是非同步非阻塞,三個子程序幾乎是同時建立完成的。

4.2 同步阻塞

# 相關概念
# 同步阻塞
    # 呼叫函式必須等待結果\cpu沒工作 input sleep recv accept connect get
# 同步非阻塞
    # 呼叫函式必須等待結果\cpu工作 - 呼叫了一個高計算的函式 strip eval('1+2+3') sum max min sorted
# 非同步阻塞
    # 呼叫函式不需要立即獲取結果,而是繼續做其他的事情,在獲取結果的時候不知道先獲取誰的,但是總之需要等(阻塞)
# 非同步非阻塞
    #  呼叫函式不需要立即獲取結果,也不需要等 start() terminate()

p.join的作用

from multiprocessing import Process
import os
import time

def func(name):
    time.sleep(2)
    print('傳送給%s的郵件'%(name))

if __name__ == '__main__':
    arg_lst = [('an',), ('bn',), ('cn',)]
    for arg in arg_lst:
        p = Process(target=func,args=arg) 
        p.start()
    		# p.join()
    print('郵件傳送完畢')
# 列印結果
# 郵件傳送完畢
# 傳送給an的郵件
# 傳送給bn的郵件
# 傳送給cn的郵件

導致這種結果的原因:每個子程序的建立之間是非同步的,建立子程序和print()之間也是非同步的,但是傳送郵件的func函式是有延時的,導致先列印“郵件傳送完畢”再列印3句傳送給...的郵件,因為這3個子程序的建立之間是非同步的,所以這3句話會同時列印。

p.join()的作用 :等p這個程序執行完畢再去執行後續程式碼,可以理解為將上述的子程序建立和print變成同步關係。如果將上述的p.join()註釋掉,那麼就變成了建立一個程序join一次,即順序執行an,bn,cn這三個子程序,這樣就將子程序的建立變成了同步關係。

可以將上述程式碼改為:

from multiprocessing import Process
import os
import time
import random

def func(name):
    time.sleep(random.random())
    print('傳送給%s的郵件'%(name))

if __name__ == '__main__':
    arg_lst = [('an',), ('bn',), ('cn',)]
    p_lst = []
    for arg in arg_lst:
        p = Process(target=func,args=arg) 
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()
    print('郵件傳送完畢')
# 列印結果
# 傳送給bn的郵件
# 傳送給an的郵件
# 傳送給cn的郵件
# 郵件傳送完畢

這樣就是非同步的了,如果在 p_lst.append(p)後面直接p.join(),那麼他就是非同步的了。

4.3 非同步阻塞

多程序之間的資料是否隔離?是,即所佔記憶體是隔開的。

n = 0
def func():
    global n 
    n += 1

if __name__ == '__main__':
    p_1 = []
    for i in range(100):
        p = Process(target=func)
        p.start()
        p_1.append(p) 
    for p in p_1:
        p.join() 
    print(n)
# 列印結果 0

4.4 多程序例項:使用多程序實現一個併發的socketserver

# 客戶端
import time
import socket

sk = socket.socket()
sk.connect(('127.0.0.1',9001))

while True:
    sk.send(b'hello')
    msg =sk.recv(1024).decode('utf-8')
    print(msg)
    time.sleep(0.5)

sk.close()
# 服務端
import socket
from multiprocessing import Process


def talk(conn):
    while True:
        data = conn.recv(1024).decode('utf-8')
        data = data.upper().encode('utf-8')
        conn.send(data)

if __name__ == '__main__':
    sk = socket.socket()
    sk.bind(('127.0.0.1', 9001))
    sk.listen()

    while True:
        conn, addr = sk.accept()
        Process(target=talk, args=(conn,)).start()

    sk.close()

4.5 通過面向物件的方式開啟多程序

# 通過面向物件的方法開啟程序
import os
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self,a,b,c):		# 實現init方法才能傳參
        self.a = a
        self.b = b
        self.c = c
        super().__init__()
# 不呼叫父類的init方法,會報AttributeError: 'MyProcess' object has no attribute '_closed'
    def run(self):
        time.sleep(1)
        print(os.getpid(), os.getppid())


if __name__ == '__main__':
    print('本程序pid',os.getpid())
    for i in range(10):
        p = MyProcess(1,2,3)
        p.start()

4.6 process類的其它方法

方法名(屬性名) 描述
name 檢視程序名字
is_alive() 檢視程序是否還活著,返回bool
terminate() 強制殺死一個程序
daemon() 在啟動一個程序之前設定為守護程序

五、守護程序

主程序會等待所有的子程序結束,為了回收資源

守護程序會等待主程序的程式碼執行結束之後再結束,而不是等待整個主程序結束。

守護程序也是子程序,所以要在主程序結束前結束,主程序要給守護程序回收資源,守護程序和其他子程序的執行進度無關。

p = Process(target=func)
p.daemon = True		# 設定p為守護程序
p.start()
p.join()		# 等待其他程序結束才結束

守護程序可以向某個服務端彙報主程序的情況,也可以使用zabbix框架。

六、程序同步 Lock

鎖:會降低程式的執行效率,保證了程序之間資料安全的問題。

from multiprocessing import Lock,Process
import time

def func(i, lock):
    lock.acquire()      # 拿鑰匙
    print('被鎖起來的程式碼%s'%(i))
    lock.release()      # 還鑰匙
    time.sleep(1)

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        p = Process(target=func, args=(i ,lock))
        p.start()

七、程序通訊 Queue

# 程序之間通訊 IPC Inter Process communication
    # 基於檔案:同一臺機器上的多個程序之間通訊
        # 基於socket的檔案級別的通訊來完成傳遞
        # Queue 佇列
    # 基於網路:同一臺機器或多臺機器上的多程序間的通訊
        # 第三方工具(訊息中介軟體)
            # memcache(基本不用)
            # redis
            # rabbitmq
            # kafka

緊耦合程式和鬆耦合程式。

7.1 佇列

佇列 ipc 程序之間通訊 -- 資料安全

基於socket\pickle\Lock實現

pipe管道基於socket\pickle實現的

from multiprocessing import Queue,Process
def son(q):
    q.put('hello')
    

def func(q):
    print(q.get())  # 放進去多少個就取多少個,否則阻塞

if __name__ == '__main__':
    q = Queue()
    Process(target=son,args=(q,)).start()
    Process(target=func,args=(q,)).start()
    # 下面的get()會阻塞住,因為一個put對應一個get,這裡的get會等待往佇列中放資料再get。
    # # print(q.get())

7.2 生產者消費者模型

1、爬蟲

2、分散式操作:celery分散式框架:celery定時分佈任務,celery機制

​ 本質:就是讓生產資料和消費資料的效率達到平衡並且最大化的效率。

from multiprocessing import Queue,Process
import time
import random

# 多個生產者一個消費者
def consumer(q, name):        # 消費者:通常取到資料之後還要進行某些操作
    # for i in range(10):
    #     print(q.get())
    while True:
        food = q.get()
        if food:
            print('%s吃了%s'%(name, food))
        else:
            break

def producer(q, name, food):        # 生產者:通常在放資料之前就通過某些程式碼獲取資料
    for i in range(10):
        foodi = '%s%s'%(food, i)
        print('%s生產了%s'%(name,foodi))
        time.sleep(random.random())
        q.put(foodi)

if __name__ == '__main__':
    q = Queue()
    c1 = Process(target=consumer, args=(q,'消費者'))
    p1 = Process(target=producer, args=(q,'生產者1', '食物'))
    p2 = Process(target=producer, args=(q,'生產者2', '狗屎'))
    c1.start()
    p1.start()
    p2.start()
    # 等待資料全部放進去之後再put(None)用於終止消費者的迴圈
    p1.join()
    p2.join()
    q.put(None)

非同步阻塞:誰先來執行誰

# 非同步阻塞加生產者-消費者模型
import requests
from multiprocessing import Process,Queue
url_dic = {
    'cnblogs':'',
    'baidu':'https://www.baidu.com',   											        'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IX SRZ',
}

def producer(name,url,q):
    ret = requests.get(url)
    q.put((name,ret.text))

def consumer(q):
    while True:
        tup = q.get()
        if tup is None:break
        with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f:
            f.write(tup[1])

if __name__ == '__main__':
    q = Queue()
    pl = []
    for key in url_dic:
        p = Process(target=producer,args=(key,url_dic[key],q))
        p.start()
        pl.append(p)
    Process(target=consumer,args=(q,)).start()
    for p in pl:p.join()
    q.put(None)