網路程式設計中的鎖與佇列
阿新 • • 發佈:2019-01-09
程序的其他方法:
1 import os,time
2 from multiprocessing import Process
3 def f1():
4 print('子程序的pid',os.getpid()) #檢視當前子程序的id
5 print('子程序的父程序的pid',os.getppid())
6 print(123)
7
8 def f2():
9 print('子程序的pid',)
10 print('父程序的pid',)
11 print(123)
12
13 if __name__ == '__main__ ':
14 p1 = Process(target=f1,name='洪七')
15 p2 = Process(target=f2)
16 p1.start()
17 p2.start()
18 print(p1.name) #檢視當前子程序的name,可以進行賦值更改name
19 print('子程序的pid',p1.pid) #檢視當前子程序的id
20 print('父程序的pid',os.getpid())
21 def f3():
22 time.sleep(5)
23 print('子程序2')
24 if __name__ == '__main__':
25 p = Process(target=f3,)
26 p.start()
27 print(p.is_alive()) #判斷子程序是否還活著,是否還在執行
28 p.terminate() #給作業系統傳送一個結束程序的訊號
29 print(p.is_alive())
驗證程序之間是空間隔離的:
1 from multiprocessing import Process
2
3 num = 100
4
5 def f1():
6 global num
7 num = 3
8 print('子程序中的num',num)
9
10 print('>>>>>',num)
11 if __name__ == '__main__':
12 p = Process(target=f1,)
13 p.start()
14 p.join()
15 print('主程序中的num',num)
殭屍程序與孤兒程序(簡單瞭解 一下)
1 一:殭屍程序(有害)
2 殭屍程序:一個程序使用fork建立子程序,如果子程序退出,而父程序並沒有呼叫wait或waitpid獲取子程序的狀態資訊,那麼子程序的程序描述符仍然儲存在系統中。這種程序稱之為僵死程序。詳解如下
3
4 我們知道在unix/linux中,正常情況下子程序是通過父程序建立的,子程序在建立新的程序。子程序的結束和父程序的執行是一個非同步過程,即父程序永遠無法預測子程序到底什麼時候結束,如果子程序一結束就立刻回收其全部資源,那麼在父程序內將無法獲取子程序的狀態資訊。
5
6 因此,UNⅨ提供了一種機制可以保證父程序可以在任意時刻獲取子程序結束時的狀態資訊:
7 1、在每個程序退出的時候,核心釋放該程序所有的資源,包括開啟的檔案,佔用的記憶體等。但是仍然為其保留一定的資訊(包括程序號the process ID,退出狀態the termination status of the process,執行時間the amount of CPU time taken by the process等)
8 2、直到父程序通過wait / waitpid來取時才釋放. 但這樣就導致了問題,如果程序不呼叫wait / waitpid的話,那麼保留的那段資訊就不會釋放,其程序號就會一直被佔用,但是系統所能使用的程序號是有限的,如果大量的產生僵死程序,將因為沒有可用的程序號而導致系統不能產生新的程序. 此即為殭屍程序的危害,應當避免。
9
10 任何一個子程序(init除外)在exit()之後,並非馬上就消失掉,而是留下一個稱為殭屍程序(Zombie)的資料結構,等待父程序處理。這是每個子程序在結束時都要經過的階段。如果子程序在exit()之後,父程序沒有來得及處理,這時用ps命令就能看到子程序的狀態是“Z”。如果父程序能及時 處理,可能用ps命令就來不及看到子程序的殭屍狀態,但這並不等於子程序不經過殭屍狀態。 如果父程序在子程序結束之前退出,則子程序將由init接管。init將會以父程序的身份對殭屍狀態的子程序進行處理。
11
12 二:孤兒程序(無害)
13
14 孤兒程序:一個父程序退出,而它的一個或多個子程序還在執行,那麼那些子程序將成為孤兒程序。孤兒程序將被init程序(程序號為1)所收養,並由init程序對它們完成狀態收集工作。
15
16 孤兒程序是沒有父程序的程序,孤兒程序這個重任就落到了init程序身上,init程序就好像是一個民政局,專門負責處理孤兒程序的善後工作。每當出現一個孤兒程序的時候,核心就把孤 兒程序的父程序設定為init,而init程序會迴圈地wait()它的已經退出的子程序。這樣,當一個孤兒程序淒涼地結束了其生命週期的時候,init程序就會代表黨和政府出面處理它的一切善後工作。因此孤兒程序並不會有什麼危害。
17
18 我們來測試一下(建立完子程序後,主程序所在的這個指令碼就退出了,當父程序先於子程序結束時,子程序會被init收養,成為孤兒程序,而非殭屍程序),檔案內容
19
20 import os
21 import sys
22 import time
23
24 pid = os.getpid()
25 ppid = os.getppid()
26 print 'im father', 'pid', pid, 'ppid', ppid
27 pid = os.fork()
28 #執行pid=os.fork()則會生成一個子程序
29 #返回值pid有兩種值:
30 # 如果返回的pid值為0,表示在子程序當中
31 # 如果返回的pid值>0,表示在父程序當中
32 if pid > 0:
33 print 'father died..'
34 sys.exit(0)
35
36 # 保證主執行緒退出完畢
37 time.sleep(1)
38 print 'im child', os.getpid(), os.getppid()
39
40 執行檔案,輸出結果:
41 im father pid 32515 ppid 32015
42 father died..
43 im child 32516 1
44
45 看,子程序已經被pid為1的init程序接收了,所以殭屍程序在這種情況下是不存在的,存在只有孤兒程序而已,孤兒程序宣告週期結束自然會被init來銷燬。
46
47
48 三:殭屍程序危害場景:
49
50 例如有個程序,它定期的產 生一個子程序,這個子程序需要做的事情很少,做完它該做的事情之後就退出了,因此這個子程序的生命週期很短,但是,父程序只管生成新的子程序,至於子程序 退出之後的事情,則一概不聞不問,這樣,系統執行上一段時間之後,系統中就會存在很多的僵死程序,倘若用ps命令檢視的話,就會看到很多狀態為Z的程序。 嚴格地來說,僵死程序並不是問題的根源,罪魁禍首是產生出大量僵死程序的那個父程序。因此,當我們尋求如何消滅系統中大量的僵死程序時,答案就是把產生大 量僵死程序的那個元凶槍斃掉(也就是通過kill傳送SIGTERM或者SIGKILL訊號啦)。槍斃了元凶程序之後,它產生的僵死程序就變成了孤兒進 程,這些孤兒程序會被init程序接管,init程序會wait()這些孤兒程序,釋放它們佔用的系統程序表中的資源,這樣,這些已經僵死的孤兒程序 就能瞑目而去了。
51
52 四:測試
53 #1、產生殭屍程序的程式test.py內容如下
54
55 #coding:utf-8
56 from multiprocessing import Process
57 import time,os
58
59 def run():
60 print('子',os.getpid())
61
62 if __name__ == '__main__':
63 p=Process(target=run)
64 p.start()
65
66 print('主',os.getpid())
67 time.sleep(1000)
68
69
70 #2、在unix或linux系統上執行
71 [[email protected] ~]# python3 test.py &
72 [1] 18652
73 [[email protected] ~]# 主 18652
74 子 18653
75
76 [[email protected] ~]# ps aux |grep Z
77 USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
78 root 18653 0.0 0.0 0 0 pts/0 Z 20:02 0:00 [python3] <defunct> #出現殭屍程序
79 root 18656 0.0 0.0 112648 952 pts/0 S+ 20:02 0:00 grep --color=auto Z
80
81 [[email protected] ~]# top #執行top命令發現1zombie
82 top - 20:03:42 up 31 min, 3 users, load average: 0.01, 0.06, 0.12
83 Tasks: 93 total, 2 running, 90 sleeping, 0 stopped, 1 zombie
84 %Cpu(s): 0.0 us, 0.3 sy, 0.0 ni, 99.7 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
85 KiB Mem : 1016884 total, 97184 free, 70848 used, 848852 buff/cache
86 KiB Swap: 0 total, 0 free, 0 used. 782540 avail Mem
87
88 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
89 root 20 0 29788 1256 988 S 0.3 0.1 0:01.50 elfin
90
91
92 #3、
93 等待父程序正常結束後會呼叫wait/waitpid去回收殭屍程序
94 但如果父程序是一個死迴圈,永遠不會結束,那麼該殭屍程序就會一直存在,殭屍程序過多,就是有害的
95 解決方法一:殺死父程序
96 解決方法二:對開啟的子程序應該記得使用join,join會回收殭屍程序
97 參考python2原始碼註釋
98 class Process(object):
99 def join(self, timeout=None):
100 '''
101 Wait until child process terminates
102 '''
103 assert self._parent_pid == os.getpid(), 'can only join a child process'
104 assert self._popen is not None, 'can only join a started process'
105 res = self._popen.wait(timeout)
106 if res is not None:
107 _current_process._children.discard(self)
108
109 join方法中呼叫了wait,告訴系統釋放殭屍程序。discard為從自己的children中剔除
守護程序:(**)
主程序建立守護程序
其一:守護程序會在主程序程式碼執行結束後就終止
其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children
注意:程序之間是互相獨立的,主程序程式碼執行結束,守護程序隨即終止
守護程序程式碼展示:
1 import time
2 from multiprocessing import Process
3
4 def f1():
5 time.sleep(3)
6 print('xxxx')
7
8 def f2():
9 time.sleep(5)
10 print('普通子程序的程式碼')
11 if __name__ == '__main__':
12 p = Process(target=f1,)
13 p.daemon = True #將該程序設定為守護程序,必須寫在start之前,意思如果我的主程序程式碼執行結束了,你這個子程序不管執行到什麼地方,都直接結束
14 p.start()
15
16 #開啟一個普通的子程序來驗證一下守護程序的結束只和主程序的程式碼執行結束有關係,而整個程式的結束需要主程序和普通的子程序的程式碼都執行結束才結束
17 p2 = Process(target=f2,)
18 p2.start()
19 #等待2號普通程序的結束,才繼續執行下面主程序中的程式碼
20 # p2.join()
21 #守護程序會跟跟著父程序的程式碼執行結束,就結束
22 print('主程序結束')
程序鎖(同步鎖/互斥鎖) *****
程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或同一個列印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。
簡單的程序鎖程式碼展示
1 # 互斥鎖/程序鎖/同步鎖
2 # import json
3 import time
4 from multiprocessing import Process,Lock
5
6 def show_t(i):
7
8 with open('ticket','r',encoding='utf-8') as f:
9 ticket_data = f.read()
10 # print(ticket_data)
11 t_data = eval(ticket_data)
12 # print(t_data,type(t_data))
13 print('%s查詢剩餘票數為%s'%(i,t_data['count']))
14
15 def get_t(i,l1):
16 l1.acquire()
17 with open('ticket', 'r', encoding='utf-8') as f:
18 ticket_data = f.read()
19 # print(ticket_data)
20 t_data = eval(ticket_data)
21 # print(t_data,type(t_data))
22 # print('%s查詢剩餘票數為%s' % (i, t_data['count']))
23 if t_data['count'] > 0:
24 t_data['count'] -= 1
25 print('%s搶票成功'%i)
26 time.sleep(0.2)
27 with open('ticket', 'w') as f:
28 f.write(str(t_data))
29
30 else:
31 print('沒票了!!!')
32 l1.release()
33
34 if __name__ == '__main__':
35 l1 = Lock()
36 for i in range(10):
37 p1 = Process(target=show_t,args=(i,))
38 p1.start()
39 for i in range(10):
40 p2 = Process(target=get_t,args=(i,l1) )
41 p2.start()
資料共享:
1 import time
2 from multiprocessing import Process,Manager,Lock
3
4 def f1(m_d,l2):
5 # m_d['num'] -= 1 #
6 with l2:
7 # l2.acquire()
8 tmp = m_d['num']
9 tmp -= 1
10 time.sleep(0.1)
11 m_d['num'] = tmp
12 # l2.release()
13
14 if __name__ == '__main__':
15 m = Manager()
16 l2 = Lock()
17 m_d = m.dict({'num':100})
18 p_list = []
19 for i in range(10):
20 p = Process(target=f1,args=(m_d,l2))
21 p.start()
22 p_list.append(p)
23
24 [pp.join() for pp in p_list]
25
26 print(m_d['num'])
for 迴圈建立多程序:
1 import time
2 from multiprocessing import Process
3
4
5 def f1():
6 time.sleep(0.5)
7 print('xxx')
8
9 if __name__ == '__main__':
10 p_list = []
11 #for迴圈建立子程序,並且完成主程序等待所有子程序執行結束,才繼續執行
12 for i in range(10):
13 p = Process(target=f1,)
14 p.start()
15 p_list.append(p)
16 p.join()
17 # for pp in p_list:
18 # pp.join()
19
20 print('主程序結束')
佇列(Queue):
程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的。佇列就像一個特殊的列表,但是可以設定固定長度,並且從前面插入資料,從後面取出資料,先進先出。
Queue([maxsize]) 建立共享的程序佇列。
引數 :maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。
底層佇列使用管道和鎖實現。
基於佇列的程序通訊:
1 from multiprocessing import Process,Queue
2
3 def f1(q):
4 q.put('約嗎?')
5
6 if __name__ == '__main__':
7 q = Queue(3)
8
9 p = Process(target=f1,args=(q,))
10 p.start()
11
12 son_p_msg = q.get()
13
14 print('來自子程序的訊息:',son_p_msg)
利用佇列實現一個生產消費模型:
1 import time
2 from multiprocessing import Process,JoinableQueue
3 #生產者
4 def producer(q):
5 for i in range(10):
6 time.sleep(0.2)
7 s = '大包子%s號'%i
8 print(s+'新鮮出爐,拿去用')
9 q.put(s)
10 q.join() #就等著task_done()訊號的數量,和我put進去的數量相同時,才繼續執行
11 print('所有的任務都被處理了,繼續潛行吧騷年們')
12
13 def consumer(q):
14 while 1:
15 time.sleep(0.5)
16 baozi = q.get()
17
18 print(baozi+'被吃了')
19 q.task_done() #給佇列傳送一個取出的這個任務已經處理完畢的訊號
20
21 if __name__ == '__main__':
22 # q = Queue(30)
23 q = JoinableQueue(30) #同樣是一個長度為30的佇列
24
25 pro_p = Process(target=producer,args=(q,))
26 con_p = Process(target=consumer,args=(q,))
27 pro_p.start()
28 con_p.daemon = True
29 con_p.start()
30
31
32 pro_p.join()
33 print('主程序結束')
程序佇列 ***** Queue()
Q = Queue(5)
Q.put() #滿了會等待
Q.get() #沒有資料了會等待
Q.qsize()
Q.empty() 不可靠
Q.full()不可靠
Q.get_nowait() #不等待,但是報錯
Q.put_nowait() #不等待,但是報錯