1. 程式人生 > >Day 32 process&threading_4

Day 32 process&threading_4

位置 run not pri 管理 port 等等 init 過程

線程和進程 4

一、multiprocessing模塊

  multiprocessing包是Python中的多進程管理包。 與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。

1, python的Process類的進程調用

 1 ##############  進程調用
 2 #  調用進程模塊和時間模塊,並測試多進程功能
 3 
 4 from
multiprocessing import Process 5 import time 6 7 def pp(name): 8 """ 9 函數:完成進程調用測試工作 10 :return: 11 """ 12 print("you are coming!",name,time.ctime()) 13 time.sleep(2) 14 15 if __name__ == __main__: 16 p_list = [] 17 18 # 開三個分進程,程序本身算主進程 19 for
i in range(3): 20 p = Process(target=pp,args=("%s" %i,)) # 調用方式基本和thread相似 21 p_list.append(p) 22 p.start() 23 24 for j in p_list: # 分別阻斷進程和主進程間關系 25 j.join() 26 27 print("end!",time.ctime())

2,繼承類的調用方式

 1 ######## 繼承Process類調用
 2 #
 3 
 4
from multiprocessing import Process 5 import time 6 7 class MyProcess(Process): 8 """ 9 繼承父類Process的所用屬性和功能 10 """ 11 def __init__(self): 12 Process.__init__(self) # 繼承父類的所有__init__屬性 13 14 # 實例執行start時,自動觸發run的執行,可查看系統源碼追溯到 15 def run(self): 16 print("Welcome to beijing!",self,time.ctime()) 17 time.sleep(2) 18 19 if __name__ == __main__: 20 p_list = [] 21 for i in range(3): 22 xx = MyProcess() 23 xx.start() 24 p_list.append(xx) 25 26 for j in p_list: 27 j.join() 28 29 print("END!",time.ctime())

二、進程間的通訊

2、1 進程隊列queue

 1 from multiprocessing import Process,Queue
 2 
 3 def xx(q,n):
 4     q.put(n*n+6)
 5     # 測試子線程隊列的位置
 6     print("son process of:",id(q))
 7 
 8 if __name__ == __main__:
 9     q = Queue()
10     # 測試主線程隊列的位置
11     print("main process of: ",id(q))
12 
13     for i in range(3):
14         p = Process(target=xx,args=(q,i,))
15         p.start()
16 
17     print(q.get())
18     print(q.get())
19     print(q.get())
20 
21 ‘‘‘
22 # 事實是為了證明:Queue雖然實現了線程間的交流,但是實際是在不同線程開辟了不一樣的內存空間。然而linux優化,結果就如下了:mac是一樣的
23 main process of:  4319925192
24 son process of: 4319925192
25 6
26 son process of: 4319925192
27 7
28 son process of: 4319925192
29 10
30 ‘‘‘

2、2 進程管道 Pipe

 1 ############## Pipe 進程管道
 2 """
 3 Pipe()返回的兩個連接對象代表管道的兩端。
 4     每個連接對象都有send()和recv()方法(等等)。
 5     請註意,如果兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。
 6 """
 7 
 8 from multiprocessing import Process,Pipe
 9 
10 def x_l(l_conn):
11     """
12     開辟一個進程負責lc的通話
13     :param l_conn:
14     :return:
15     """
16     l_conn.send("Welcome to beijing!")
17     response = l_conn.recv()
18     print("x:",response)
19     l_conn.close()
20 
21 if __name__ == __main__:
22     x_conn,l_conn = Pipe()
23     p = Process(target=x_l,args=(l_conn,))  # 開子進程,負責lc
24     p.start()
25     res = x_conn.recv()
26     print("l:",res)
27     x_conn.send("Im coming!")
28     p.join()
29 
30 """
31 l: Welcome to beijing!
32 x: Im coming!
33 """

2,3 Manager 數據共享

 1 ############## Manager 數據共享:一個數據去更改另一個進程裏的數據
 2 
 3 from multiprocessing import Manager,Process
 4 
 5 def xl(Mlist,i):
 6     Mlist.append(i)
 7 
 8 if __name__ == __main__:
 9     manager = Manager()  # 實例一個Manager
10     Mlist = manager.list([1,"a"])  # 數據共享類型為列表,也可以用字典等
11     l = []
12 
13     # 開子進程,並往主線程共享列表添加變量
14     for i in range(5):
15         p = Process(target=xl,args=(Mlist,i,))
16         p.start()
17         l.append(p)
18 
19     for j in l:
20         j.join()
21 
22     print(Mlist

2,4 Pool 進程池

 1 ############## Pool
 2 """
 3 進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,
 4 如果進程池序列中沒有可供使用的進進程,那麽程序就會等待,
 5 直到進程池中有可用進程為止。
 6 """
 7 from multiprocessing import Pool
 8 import time
 9 
10 def xl(n):
11     print(n)
12     time.sleep(2)
13     print("END!")
14 
15 if __name__ == __main__:
16     pool_obj = Pool()  # 進程池,默認進程數是cpu核數,其中os.cpu_count()查看
17     for i in range(20):
18         pool_obj.apply_async(func=xl,args=(i,))
19     pool_obj.close()  # 執行後,不會有新等進程進入進程池
20     pool_obj.join()  # join()在close()之後,套路,牢記!!!
21 
22     print("ALL IS OVER!")

1 p.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
3    
4 p.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成5 P.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用

三, 協程

1 """
2 協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
3 對比操作系統控制線程的切換,用戶在單線程內控制協程的切換,優點如下:
4 1.  協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級
5 2. 單線程內就可以實現並發的效果,最大限度地利用cpu
6 """
 1 import time
 2 
 3 """
 4 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。
 5 如果改用協程,生產者生產消息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
 6 """
 7 # 註意到consumer函數是一個generator(生成器):
 8 # 任何包含yield關鍵字的函數都會自動成為生成器(generator)對象
 9 
10 def consumer():
11     r = ‘‘
12     while True:
13         # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回;
14         #    yield指令具有return關鍵字的作用。然後函數的堆棧會自動凍結(freeze)在這一行。
15         #    當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,
16         #    就會從yield代碼的下一行開始,繼續執行,再返回下一次叠代結果。通過這種方式,叠代器可以實現無限序列和惰性求值。
17         n = yield r
18         if not n:
19             return
20         print([CONSUMER] ←← Consuming %s... % n)
21         time.sleep(1)
22         r = 200 OK
23 def produce(c):
24     # 1、首先調用c.next()啟動生成器
25     next(c)
26     n = 0
27     while n < 5:
28         n = n + 1
29         print([PRODUCER] →→ Producing %s... % n)
30         # 2、然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
31         cr = c.send(n)
32         # 4、produce拿到consumer處理的結果,繼續生產下一條消息;
33         print([PRODUCER] Consumer return: %s % cr)
34     # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
35     c.close()
36 if __name__==__main__:
37     # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。
38     c = consumer()
39     produce(c)
40     
41     
42 ‘‘‘
43 result:
44 
45 [PRODUCER] →→ Producing 1...
46 [CONSUMER] ←← Consuming 1...
47 [PRODUCER] Consumer return: 200 OK
48 [PRODUCER] →→ Producing 2...
49 [CONSUMER] ←← Consuming 2...
50 [PRODUCER] Consumer return: 200 OK
51 [PRODUCER] →→ Producing 3...
52 [CONSUMER] ←← Consuming 3...
53 [PRODUCER] Consumer return: 200 OK
54 [PRODUCER] →→ Producing 4...
55 [CONSUMER] ←← Consuming 4...
56 [PRODUCER] Consumer return: 200 OK
57 [PRODUCER] →→ Producing 5...
58 [CONSUMER] ←← Consuming 5...
59 [PRODUCER] Consumer return: 200 OK
60 ‘‘‘

3,1 Gevent

 1 #############   協程 greenlet
 2 
 3 from gevent import monkey
 4 monkey.patch_all()
 5 import gevent
 6 from urllib import request
 7 import time
 8 
 9 def xl(url):
10     print("GET:%s" %url)
11     res = request.urlopen(url)
12     data = res.read()
13     print("%d bytes recevied from %s" %(len(data),url))
14 
15 start = time.time()
16 
17 gevent.joinall([
18     # gevent.spawn(xl,"http://www.xiaohuar.com"),
19     gevent.spawn(xl,"http://www.mmjpg.com"),
20     gevent.spawn(xl,"http://www.fengniao.com")
21 ])
22 
23 print(time.time()-start)

Greenlet Gevent 自動切換的牛逼之處,可以的就開幹

Day 32 process&threading_4