Laravel 列印SQL日誌
阿新 • • 發佈:2020-12-26
程序程序即正在執行的一個過程。程序是對正在執行程式的一個抽象.
開啟程序的方法:
import time import os from multiprocessing import Process def func(args,args2): print(args,args2) print('子',os.getpid()) print('子的父',os.getppid()) if __name__=='__main__': p = Process(target=func,args=('引數1','引數2')) #主程序 # p是一個程序物件 還沒啟動函式方法p.start() #建立一個子程序 和父程序非同步進行 print('*'*5) print('父',os.getpid()) print('父的父',os.getppid()) #程序的生命週期 #開啟子程序的主程序: #主程序的程式碼如果長 等待自己的程式碼執行結束 #子程序始行時間長 主程序執行完畢後 等待子進行執行完畢後 主程序才結束
import os # from multiprocessing import Process # # class Myprocess(Process): #繼承方法def __init__(self,args,args2): # super().__init__() # self.args = args # self.args2 = args2 # def run(self): # print(self.pid) # # # if __name__=='__main__': # print('主:',os.getpid()) # p1 = Myprocess(1,2) # p1.start() # p2 = Myprocess(2,3) # p2.start()
開啟多個子程序:
# import time # from multiprocessing import Process # def func(args,args2): # print('*'*args) # time.sleep(3) # print('*'*args2) # # if __name__=='__main__': # p_list = [] # for i in range(1,10): # p = Process(target=func,args=(1*i,2*i)) # p_list.append(p) # p.start() # [p.join() for p in p_list] # print('執行完了')多程序用list
多程序中的幾種方法:
join()方法 感知一個子程序的結束
p.daemon = True 在start()前插入程式碼 就是將該子程序設定為守護程序
守護程序在主程序程式碼讀完後結束,此時主程序可能還沒有結束 而是在等待別的子程序執行結束才結束
p.is_alive檢驗一個程序是否還活著
# import time # from multiprocessing import Process # def fun(): # while 1: # print('活著') # time.sleep(0.1) # def fun2(): # print('fun2 start') # time.sleep(10) # print('fub2 end') # # if __name__=='__main__': # p = Process(target=fun) # p.daemon = True # 設定子程序為守護程序 # p.start() # p2 = Process(target=fun2) # p2.start() # print(p2.is_alive()) # is_alive檢驗一個程序是否活著 # print(p2.name) # # i = 0 # while i<5: # print('socket server') # time.sleep(1) # i += 1上面方法的使用
程序鎖:
多個程序同時搶一個數據就會出現資料安全問題,此時需要用程序鎖限制資料,同一時間只能被一個程序操作
歸還資料之後才能被另外的程序操作。此時犧牲效率保證安全
import time import json from multiprocessing import Process from multiprocessing import Lock def show(i): with open('ticket') as f: dic = json.load(f) print('餘票:%s'%dic['ticket']) def buy_tk(i,lock): lock.acquire() with open('ticket') as f: dic = json.load(f) time.sleep(0.1) if dic['ticket']>0: dic['ticket'] -= 1 print('\033[32m%s買到票了\033[0m'%i) else: print('\033[31m%s沒買到票\033[0m'%i) time.sleep(0.1) with open('ticket','w') as f: json.dump(dic,f) lock.release() if __name__=='__main__': for i in range(10): p = Process(target=show,args=(i,)) p.start() lock = Lock() p.join() for i in range(10): p = Process(target=buy_tk,args=(i,lock)) p.start()火車票問題
事件:
一個訊號是所有程序都進入阻塞狀態,也可以控制解除阻塞
事件被建立 預設是阻塞狀態
# from multiprocessing import Event,Process # import time # import random # # def cars(i,e): # if not e.is_set(): # print('\033[31m%s等待\033[0m'%i) # e.wait() # print('\033[32m%s通過\033[0m'%i) # # # # def light(e): # while 1: # if e.is_set(): # e.clear() # print('\033[31m紅燈亮了\033[0m') # # else: # e.set() # print('\033[32m綠燈亮了\033[0m') # # time.sleep(2) # # if __name__=='__main__': # e = Event() # p = Process(target=light,args=(e,)) # p.start() # for i in range(20): # car = Process(target=cars,args=(i,e)) # car.start() # time.sleep(random.random())紅綠燈問題
資訊量:
同一時間只能n程序進行操作,房間上n把鎖,只有等裡面的出來,外面的才能再進去
import time import random from multiprocessing import Process from multiprocessing import Semaphore def func(i,sem): sem.acquire() print('%i走進ktv'%i) time.sleep(random.randint(2,5)) print('%i走出ktv'%i) sem.release() if __name__=='__main__': sem = Semaphore(4) # 給房間上鎖 每次只能進4個 for i in range(20): Process(target=func,args=(i,sem)).start()
佇列:
佇列能實現程序之間的通訊
# from multiprocessing import Queue # # q = Queue(5) # q.put(1) #put方法 往佇列裡放 # q.put(2) # q.put(3) # q.put(4) # q.put(5) # # q.put(5) #若佇列已滿 再放會阻塞 # print(q.full()) #判斷佇列是否滿了 # q.get() #get方法 從佇列裡取 # q.get() # q.get() # q.get() # q.get() # # q.get() #若佇列為空 再取就會阻塞 # print(q.empty()) # 佇列是否為空 # try: # q.get_nowait() # 有值就取 沒有報錯 不等待 不阻塞 # except: # print('已經沒有值了') # 用Queue實現程序之間通訊 資料傳遞 from multiprocessing import Queue,Process def prodect(q): q.put('hello') def consume(q): print(q.get()) if __name__=='__main__': q = Queue() p = Process(target=prodect,args=(q,)) p.start() c = Process(target=consume,args=(q,)) c.start()
from multiprocessing import Queue,Process import time import random def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) fo = '%s%s'%(food,i) print('%s生產了%s'%(name,fo)) q.put(fo) def consumer(name,q): while 1: food = q.get() print('\033[31m%s消費了%s\033[0m'%(name,food)) time.sleep(random.randint(1,3)) if __name__=='__main__': q = Queue() p = Process(target=producer,args=('dh','包子',q)) p2 = Process(target=producer,args=('大黃','榴蓮',q)) c = Process(target=consumer,args=('小包',q)) c2 = Process(target=consumer,args=('二黃',q)) p.start() p2.start() c.start() c2.start() p.join() p2.join()生產者消費者模型 queue
該方法最後會阻塞,程序不會結束
from multiprocessing import JoinableQueue,Process import time import random def producer(name,food,q): for i in range(4): time.sleep(random.randint(1,3)) fo = '%s%s'%(food,i) print('%s生產了%s'%(name,fo)) q.put(fo) q.join() #阻塞 直到佇列中的所有資料全部被處理完畢 def consumer(name,q): while 1: food = q.get() print('\033[31m%s消費了%s\033[0m'%(name,food)) time.sleep(random.randint(1,3)) q.task_done() if __name__=='__main__': q = JoinableQueue() p = Process(target=producer,args=('dh','包子',q)) p2 = Process(target=producer,args=('大黃','榴蓮',q)) c = Process(target=consumer,args=('小包',q)) c2 = Process(target=consumer,args=('二黃',q)) p.start() p2.start() c.daemon = True c2.daemon = True c.start() c2.start() p.join() p2.join()生產者消費者模型 joinablequeue
管道:
管道的返回值是兩個管道口,對應輸入與輸出,可以實現程序之間的通訊
若管道口關閉,仍從管道內取值,管道中沒有值時就會丟擲異常
#生產者消費者模型用管道實現 from multiprocessing import Pipe,Process,Lock import time import random def producer(con,pro,name,food): con.close() for i in range(10): fo = '%s%s'%(food,i) print('%s生產了%s'%(name,fo)) pro.send(fo) time.sleep(random.randint(1,3)) pro.close() def consumer(con,pro,name,lock): pro.close() while 1: try: lock.acquire() food = con.recv() lock.release() print('%s吃了%s'%(name,food)) time.sleep(random.randint(1,3)) except EOFError: con.close() break if __name__=='__main__': con,pro = Pipe() lock = Lock() p = Process(target=producer,args=(con,pro,'dh','包子')) p.start() c = Process(target=consumer,args=(con,pro,'大黃',lock)) c.start() # c2 = Process(target=consumer,args=(con,pro,'大黃2',lock)) # c2.start() con.close() pro.close()
當存在多個消費者時,就是出現多個消費者搶資源,從而資料不安全。
解決方法就是加鎖
管道加鎖就實現了佇列。
程序中的資料共享
rom multiprocessing import Manager,Process,Lock
def main(dic,l):
l.acquire()
dic['con']-=1
l.release()
if __name__=='__main__':
m = Manager()
l = Lock()
dic = m.dict({'con':100}) # 資料共享
p_lst = []
for i in range(50):
p = Process(target=main,args=(dic,l))
p.start()
p_lst.append(p)
for i in p_lst:i.join()
print(dic)
正常情況下,執行完資料的結果應該是50,但實際上每次的結果可能都不同
說明資料共享的同時,有子程序同時拿到了資料,進行了重複的操作
資料共享是不安全的,解決方法是加鎖
程序池:
每開啟一個子程序,都要開啟一個屬於這個子程序記憶體空間
作業系統排程效率降低
程序池:在python中建立一個裝程序的池子,指定池子中有一定數量的程序,這些程序被建立好,等待被使用
每次進入一定數量的程序,一個處理完話另一個進
# 程序池 map方法 與開啟多程序時間上的比較 # from multiprocessing import Process,Pool # import time # # def fun(n): # print(n+1) # # if __name__=='__main__': # st = time.time() # pool = Pool(5) # 程序池 提升效率 # pool.map(fun,range(100)) # t1 = time.time()-st # # enn = time.time() # p_lst = [] # for i in range(100): # p = Process(target=fun,args=(i,)) # p.start() # p_lst.append(p) # for p in p_lst:p.join() # t2 = time.time()-enn # print(t1,t2)
map方法中要放一個可迭代的
# from multiprocessing import Pool # import time,os # def fun(n): # print('%sstart'%n,os.getpid()) # time.sleep(0.3) # print('%send'%n,os.getpid()) # # if __name__=='__main__': # p = Pool(5) # for i in range(10): # p.apply_async(fun,args=(i,)) #非同步接收 # p.close() # 結束程序池接受任務 # p.join() # 感知程序池中的任務結束
p.apply()為同步方法
程序池的返回值
# from multiprocessing import Pool # import time # def fun(i): # time.sleep(0.5) # return i*i # if __name__=='__main__': # p = Pool(5) # for i in range(10): # res = p.apply(fun,args=(i,)) #同步 一個一個列印 # print(res) # res_l = [] # for i in range(10): # res = p.apply_async(fun,args=(i,)) # res_l.append(res) # for res in res_l: # get方法會阻塞 放在生成程序的外面 # print(res.get()) #非同步方法 一次列印5個(程序池為5) # res = p.map(fun,range(10)) # print(res) # 一次列印所有[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 所有資料處理完 一塊返回 結果是一個列表
map簡單呼叫非同步函式apply_async非同步實現更強大
程序池的回撥函式
# from multiprocessing import Pool # import os # def func1(n): # print('in func1',os.getpid()) # return n*n # def func2(nn): # print(nn,os.getpid()) # # if __name__=='__main__': # print(os.getpid()) # p = Pool(5) # p.apply_async(func1,args=(10,),callback=func2) #callback回撥函式 # p.close() # 執行func1的結果作為func2的引數傳進回撥函式 # p.join() # 回撥函式在主程序空間
第一個函式的返回值 作為回撥函式的引數進行處理
回撥函式是主程序中的操作
回撥函式常用於爬蟲,子程序爬取複雜的資訊,交給主程序處理,避免在切換程序過程中的網路延時
最後,一個沒反應的程式,不知道哪錯了,學了後面的再回來找補。
import requests from multiprocessing import Pool from urllib.request import urlopen # def get(url): # response = requests.get(url) # if response.status_code == 200: # return url,response.content.decode('utf-8') def get_urllib(url): ret = urlopen(url) return ret.read().decode('utf-8') def call_back(args): url,content = args print(url,len(content)) if __name__=='__main__': url_lst = [ 'https://www.baidu.com' 'https://www.sohu.com' 'https://www.sougou.com' ] p = Pool(5) for ult in url_lst: p.apply_async(get_urllib,args=(ult,),callback = call_back) p.close() p.join()