11.併發程式設計之程序
目錄
一、作業系統基礎
定義:作業系統是一個協調、管理和控制計算機硬體資源和軟體資源的控制程式。
作業系統本質位於計算機硬體和軟體之間,本質也是一個軟體。作業系統由作業系統的核心(運行於核心態,管理硬體資源)以及系統呼叫(運行於使用者態,為應用程式設計師寫的應用程式提供系統呼叫介面)兩部分組成。
作業系統的發展史:
- 第一代計算機(1940~1955):真空管和穿孔卡片
- 第二代計算機(1955~1965):電晶體和批處理系統
- 第三代計算機(1965~1980):積體電路晶片和多道程式設計
- 第四代計算機(1980~至今):個人計算機
多道技術:
-
實現併發,現在是多核,但是每個核都會用到多道技術
-
空間上的複用:記憶體中有多道程式
-
時間上的複用:複用一個cpu的時間片
二、多程序基礎
2.1 程序基礎
程序:就是一個過程或一個任務,與程式的區別是程式只是一些程式碼,但是程序是程式的執行過程,程式執行起來就是一個程序,一個程序執行兩次是兩個程序。程序也是計算中最小的資源分配單位。
pid:程序的唯一標識,
2.2 併發和並行
併發:偽並行,即多個程式輪流在一個cpu上執行,像多個程式在同時執行。單個cpu+多道技術可實現併發。
並行:多個程式同時執行,在多個cpu上執行。
實現:單核下,可以利用多道技術,多核,每個核也都可以利用多道技術(多道技術是針對單核而言的)有4個核,5個任務,這樣同一時間有4個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4;一旦其中一個遇到I/O被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術。
多道技術:記憶體中同時存入多道(多個)程式,cpu從一個程序快速切換到另外一個,使每個程序各自執行一段時間,這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻可以執行多個程序,這就給人產生了並行的錯覺,即偽並行,以此來區分多處理器作業系統的真正硬體並行(多個cpu共享同一個實體記憶體)。
2.3 同步\非同步和阻塞\非阻塞
同步:就是在發出一個功能呼叫時,在沒有得到結果之前,該呼叫就不會返回。按照這個定義,其實絕大多數函式都是同步呼叫。簡單來說就是做A事的時候發起B事,必須等待B事結束。
非同步的概念和同步相對。當一個非同步功能呼叫發出後,呼叫者不能立刻得到結果。當該非同步功能完成後,通過狀態、通知或回撥來通知呼叫者。簡單來說就是做A事的時候發起B事件,不需要等待B事件結束就可以繼續A事件。
阻塞:cpu不工作,等待狀態,input,accept,recv,recvfrom,sleep,connect
非阻塞:cpu在工作
三、多程序
程序的三狀態圖
程序的排程演算法:給所有的程序分配資源或分配cpu的使用權的一種方法。
短作業優先,先來先服務,多級反饋演算法,
執行緒是程序中的一個單位,不能脫離程序存在,執行緒是計算中能被cpu排程的最小單位。
四、multiprocessing模組
4.1 建立子程序
pid --> process id
ppid --> parent peocess id
from multiprocessing import Process
import os
def func():
# 獲取程序id和父程序id
print(os.getpid(),os.getppid())
if __name__ == '__main__':
# 只會在主程序中執行一次,這需要將只在主程序中執行的程式碼放在main下面。
print('main', os.getpid(),os.getppid())
p = Process(target=func)
p.start() # 此處開啟了一個子程序
在子程序中會自動import 父程序中所有的檔案,如果不加if __name__ == '__main__': ,那麼建立子程序時會自動執行了p =process(tartget=func)這會建立子程序的子程序,導致迴圈。
在Linux中會把父程序中的內容複製一份到子程序中,而不是import 父程序的程式碼,通過fork來完成的。
給子程序傳遞引數:在 p = Process(target=func) 傳遞引數,p = Process(target=func,args=('a')) 傳遞的args引數必須是一個元組。
能不能獲取子程序的返回值-->不能
可以開啟多個子程序
# 多元的處理程序的模組
from multiprocessing import Process
import os
import time
def func(name):
time.sleep(5)
print(os.getpid(),os.getppid(), name)
if __name__ == '__main__':
print('main', os.getpid(),os.getppid())
p = Process(target=func,args=('an',))
p.start() # 此處開啟了一個子程序
p = Process(target=func,args=('bn',))
p.start()
p = Process(target=func,args=('cn',))
p.start()
父程序和子程序是同步的,必須先等父程序建立完畢之後再建立子程序,start()是非同步非阻塞,三個子程序幾乎是同時建立完成的。
4.2 同步阻塞
# 相關概念
# 同步阻塞
# 呼叫函式必須等待結果\cpu沒工作 input sleep recv accept connect get
# 同步非阻塞
# 呼叫函式必須等待結果\cpu工作 - 呼叫了一個高計算的函式 strip eval('1+2+3') sum max min sorted
# 非同步阻塞
# 呼叫函式不需要立即獲取結果,而是繼續做其他的事情,在獲取結果的時候不知道先獲取誰的,但是總之需要等(阻塞)
# 非同步非阻塞
# 呼叫函式不需要立即獲取結果,也不需要等 start() terminate()
p.join的作用
from multiprocessing import Process
import os
import time
def func(name):
time.sleep(2)
print('傳送給%s的郵件'%(name))
if __name__ == '__main__':
arg_lst = [('an',), ('bn',), ('cn',)]
for arg in arg_lst:
p = Process(target=func,args=arg)
p.start()
# p.join()
print('郵件傳送完畢')
# 列印結果
# 郵件傳送完畢
# 傳送給an的郵件
# 傳送給bn的郵件
# 傳送給cn的郵件
導致這種結果的原因:每個子程序的建立之間是非同步的,建立子程序和print()之間也是非同步的,但是傳送郵件的func函式是有延時的,導致先列印“郵件傳送完畢”再列印3句傳送給...的郵件,因為這3個子程序的建立之間是非同步的,所以這3句話會同時列印。
p.join()的作用 :等p這個程序執行完畢再去執行後續程式碼,可以理解為將上述的子程序建立和print變成同步關係。如果將上述的p.join()註釋掉,那麼就變成了建立一個程序join一次,即順序執行an,bn,cn這三個子程序,這樣就將子程序的建立變成了同步關係。
可以將上述程式碼改為:
from multiprocessing import Process
import os
import time
import random
def func(name):
time.sleep(random.random())
print('傳送給%s的郵件'%(name))
if __name__ == '__main__':
arg_lst = [('an',), ('bn',), ('cn',)]
p_lst = []
for arg in arg_lst:
p = Process(target=func,args=arg)
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
print('郵件傳送完畢')
# 列印結果
# 傳送給bn的郵件
# 傳送給an的郵件
# 傳送給cn的郵件
# 郵件傳送完畢
這樣就是非同步的了,如果在 p_lst.append(p)後面直接p.join(),那麼他就是非同步的了。
4.3 非同步阻塞
多程序之間的資料是否隔離?是,即所佔記憶體是隔開的。
n = 0
def func():
global n
n += 1
if __name__ == '__main__':
p_1 = []
for i in range(100):
p = Process(target=func)
p.start()
p_1.append(p)
for p in p_1:
p.join()
print(n)
# 列印結果 0
4.4 多程序例項:使用多程序實現一個併發的socketserver
# 客戶端
import time
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',9001))
while True:
sk.send(b'hello')
msg =sk.recv(1024).decode('utf-8')
print(msg)
time.sleep(0.5)
sk.close()
# 服務端
import socket
from multiprocessing import Process
def talk(conn):
while True:
data = conn.recv(1024).decode('utf-8')
data = data.upper().encode('utf-8')
conn.send(data)
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1', 9001))
sk.listen()
while True:
conn, addr = sk.accept()
Process(target=talk, args=(conn,)).start()
sk.close()
4.5 通過面向物件的方式開啟多程序
# 通過面向物件的方法開啟程序
import os
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,a,b,c): # 實現init方法才能傳參
self.a = a
self.b = b
self.c = c
super().__init__()
# 不呼叫父類的init方法,會報AttributeError: 'MyProcess' object has no attribute '_closed'
def run(self):
time.sleep(1)
print(os.getpid(), os.getppid())
if __name__ == '__main__':
print('本程序pid',os.getpid())
for i in range(10):
p = MyProcess(1,2,3)
p.start()
4.6 process類的其它方法
方法名(屬性名) | 描述 |
---|---|
name | 檢視程序名字 |
is_alive() | 檢視程序是否還活著,返回bool |
terminate() | 強制殺死一個程序 |
daemon() | 在啟動一個程序之前設定為守護程序 |
五、守護程序
主程序會等待所有的子程序結束,為了回收資源
守護程序會等待主程序的程式碼執行結束之後再結束,而不是等待整個主程序結束。
守護程序也是子程序,所以要在主程序結束前結束,主程序要給守護程序回收資源,守護程序和其他子程序的執行進度無關。
p = Process(target=func)
p.daemon = True # 設定p為守護程序
p.start()
p.join() # 等待其他程序結束才結束
守護程序可以向某個服務端彙報主程序的情況,也可以使用zabbix框架。
六、程序同步 Lock
鎖:會降低程式的執行效率,保證了程序之間資料安全的問題。
from multiprocessing import Lock,Process
import time
def func(i, lock):
lock.acquire() # 拿鑰匙
print('被鎖起來的程式碼%s'%(i))
lock.release() # 還鑰匙
time.sleep(1)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=func, args=(i ,lock))
p.start()
七、程序通訊 Queue
# 程序之間通訊 IPC Inter Process communication
# 基於檔案:同一臺機器上的多個程序之間通訊
# 基於socket的檔案級別的通訊來完成傳遞
# Queue 佇列
# 基於網路:同一臺機器或多臺機器上的多程序間的通訊
# 第三方工具(訊息中介軟體)
# memcache(基本不用)
# redis
# rabbitmq
# kafka
緊耦合程式和鬆耦合程式。
7.1 佇列
佇列 ipc 程序之間通訊 -- 資料安全
基於socket\pickle\Lock實現
pipe管道基於socket\pickle實現的
from multiprocessing import Queue,Process
def son(q):
q.put('hello')
def func(q):
print(q.get()) # 放進去多少個就取多少個,否則阻塞
if __name__ == '__main__':
q = Queue()
Process(target=son,args=(q,)).start()
Process(target=func,args=(q,)).start()
# 下面的get()會阻塞住,因為一個put對應一個get,這裡的get會等待往佇列中放資料再get。
# # print(q.get())
7.2 生產者消費者模型
1、爬蟲
2、分散式操作:celery分散式框架:celery定時分佈任務,celery機制
本質:就是讓生產資料和消費資料的效率達到平衡並且最大化的效率。
from multiprocessing import Queue,Process
import time
import random
# 多個生產者一個消費者
def consumer(q, name): # 消費者:通常取到資料之後還要進行某些操作
# for i in range(10):
# print(q.get())
while True:
food = q.get()
if food:
print('%s吃了%s'%(name, food))
else:
break
def producer(q, name, food): # 生產者:通常在放資料之前就通過某些程式碼獲取資料
for i in range(10):
foodi = '%s%s'%(food, i)
print('%s生產了%s'%(name,foodi))
time.sleep(random.random())
q.put(foodi)
if __name__ == '__main__':
q = Queue()
c1 = Process(target=consumer, args=(q,'消費者'))
p1 = Process(target=producer, args=(q,'生產者1', '食物'))
p2 = Process(target=producer, args=(q,'生產者2', '狗屎'))
c1.start()
p1.start()
p2.start()
# 等待資料全部放進去之後再put(None)用於終止消費者的迴圈
p1.join()
p2.join()
q.put(None)
非同步阻塞:誰先來執行誰
# 非同步阻塞加生產者-消費者模型
import requests
from multiprocessing import Process,Queue
url_dic = {
'cnblogs':'',
'baidu':'https://www.baidu.com', 'gitee':'https://gitee.com/old_boy_python_stack__22/teaching_plan/issues/IX SRZ',
}
def producer(name,url,q):
ret = requests.get(url)
q.put((name,ret.text))
def consumer(q):
while True:
tup = q.get()
if tup is None:break
with open('%s.html'%tup[0],encoding='utf-8',mode='w') as f:
f.write(tup[1])
if __name__ == '__main__':
q = Queue()
pl = []
for key in url_dic:
p = Process(target=producer,args=(key,url_dic[key],q))
p.start()
pl.append(p)
Process(target=consumer,args=(q,)).start()
for p in pl:p.join()
q.put(None)