Python_Example_Process 程序 學習/經驗/示例
阿新 • • 發佈:2018-11-16
Author: 楚格
2018-11-16 20:08:32
IDE: Pycharm2018.02 Python 3.7
KeyWord : 程序 multiprocess Process
Explain:
--------
1 # coding=utf-8 2 #--------------------------------- 3 ''' 4 # Author : chu ge 5 # Function: 程序 學習 6 # 7''' 8 #--------------------------------- 9 ''' 10 # -------------------------------- 11 # 匯入模組 12 # 1.系統庫 13 # 2.第三方庫 14 # 3.相關定義庫 15 # -------------------------------- 16 ''' 17 # 1.系統庫 18 import sys 19 import os 20 import time 21 import random 22 23 #2.第三方庫 24 from multiprocessing importProcess 25 from multiprocessing import Pool 26 from multiprocessing import Queue 27 from multiprocessing import Manager 28 29 30 31 # 32 ''' 33 #》》》》》》》》》》 34 程序 35 36 ---------------------------------------------- 37 1.程序的建立 38 -fork 即 ret = os.fork() 39 # window中沒有fork,Linux中才有fork40 41 ----------------------- 42 1.1程序VS程式 43 編寫完畢的程式碼,在沒有執行的時候,稱為程式;正在執行的程式碼,成為程序; 44 程序,除了包含程式碼之外,還有需要執行環境等,所以和程式有區別 45 46 ----------------------- 47 1.2 fork()。python的OS模組,包括fork. 48 49 e.g: 50 pid = os.fork() 51 if pid == 0: 52 print("1") 53 else: 54 print("2") 55 說明: 56 程式執行到os.fork()時,作業系統會建立一個新的程序(子程序), 57 然後複製父程序的所有資訊到子程序中 58 然後父程序和子程序都會從fork()函式中得到一個返回值, 59 在子程序中這個值一定是0,而父程序中是子程序的ID號 60 61 在Linux中,fork()系統函式,非常特殊。 62 普通的函式呼叫,呼叫一次,但fork()呼叫一次,返回二次, 63 因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序), 64 然後,分別在父程序和子程序內返回。 65 66 ----------------------- 67 1.3子程序永遠返回0,而父程序返回子程序的ID 68 主程序即父程序 69 ret > 0 70 新建程序即子程序 71 ret = 0 72 e.g: 73 ret = os.fork() 74 print(ret) 75 result: 76 2506 77 0 78 79 ----------------------- 80 1.4一個父程序可以fork出很多子程序,所以父程序要記下每個子程序的ID, 81 而子程序只需要呼叫.getpid()就可以拿到父程序的ID 82 getpid() # 獲取當前程序的值,子程序獲取主程序的ID 83 getppid() #獲取父程序的ID 84 85 e.g: 86 ret = os.fork() 87 print(ret) 88 if ret > 0: 89 print("父程序---") 90 pritn("%d"%os.getpid()) 91 else: 92 print("子程序--") 93 pritn("%d - %d"%(os.getpid(),os.getppid())) 94 95 result: 96 20570 # 父程序 97 父程序--- # 父程序中fork的返回值, 98 20569 # 就是剛剛創建出的子程序的ID 99 0 # 子程序 100 子程序-- 101 20570 - 20569 102 103 ----------------------- 104 1.5主程序結束不會因為子程序沒有結束,而等待子程序 105 e.g: 106 ret = os.fork() 107 108 if ret == 0: 109 print("--子程序--") 110 time.sleep(5) 111 print("--子程序 over ---",end="") 112 else: 113 print("---父程序") 114 time.sleep(3) 115 116 ----------------------- 117 1.6全域性變數在多個程序中,不共享多程序修改全域性變數 118 e.g: 119 g_num = 100 120 ret = os.fork() 121 122 if ret == 0: 123 print("--process 1--") 124 g_num +=1 125 print("--process 1 ==%d"% g_num) 126 else: 127 time.sleep(3) 128 print("---process 2 ----") 129 print("--process 2 ==%d"% g_num) 130 131 result: 132 --process 1-- 133 --process 2 ==101 134 ---process 2 ---- 135 --process 2 ==100 136 137 ----------------------- 138 1.7程序內容互不影響 139 程序間通訊 : 140 管道和訊息佇列 同一臺電腦 141 網路 不同臺電腦 142 143 多次fork的問題 144 # 父程序 145 ret = os.fork() 146 ret = os.fork() 147 ret = os.fork() 148 並列子程序 2^n = 2^3 = 8 149 150 e.g: 151 # 父程序 152 ret = os.fork() 153 if ret == 0: 154 print("1) #子程序 155 else: 156 print("2") #父程序 157 158 # 父子程序 159 ret = os.fork() 160 if ret == 0: 161 print("11") 162 else: 163 print("22") 164 result: 165 2 166 22 167 11 168 1 169 22 170 11 171 一共4個程序 172 173 e.g: 174 ret = os.fork() 175 if ret == 0: 176 print("1) 177 else: 178 print("2") 179 ret = os.fork() 180 if ret == 0: 181 print("11") 182 else: 183 print("22") 184 result: 185 2 186 22 187 11 188 1 189 一共三個程序 190 191 ---------------------------------------------- 192 2. multiprocessing模組 Windows環境下使用!!! 193 194 ----------------------- 195 2.1 提供了一個process類代表一個程序物件 196 197 建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項, 198 用start()方法啟動,這樣建立程序比fork()還要簡單。 199 join()方法等待子程序結束後,在繼續下執行,通常用於程序間的同步 200 201 2.1.1 Process語法結構如下: 202 Process([group [,target [,name [,args [,kwargs]]]]]) 203 group: 大多數情況下用不到 204 target: 表示這個程序例項所呼叫的物件 205 name: 未當前程序例項的別名 206 args: 表示呼叫物件的位置引數元組 207 kwargs: 表示呼叫物件的關鍵字引數字典 208 209 2.1.2 Process類常用方法: 210 is_alive(): 判斷程序例項是否還在執行; 211 join([timeout]): 是否等待程序例項執行結束,或者等待多少秒 212 start(): 啟動程序例項(建立子程序) 213 run(): 如果沒有給定target引數,對這個物件呼叫start()方法時, 214 就將執行物件中的run()方法 215 terminate(): 不管任務是否完成,立即終止。 216 217 2.1.3 Process類常用屬性: 218 name: 當前程序例項別名,預設Process-N ,N從1開始遞增 219 pid: 當前程序例項的PID值。(python id) 220 221 222 ----------------------- 223 2.2程序的建立 -Process 子類 224 繼承Process類 225 建立新的程序還能使用類的方式,可以自定義一個類,繼承Process類, 226 每次例項化這個類的時候,就等同於例項化一個程序物件。 227 228 229 ---------------------------------------------- 230 3. 程序池 Pool 231 232 ----------------------- 233 3.1 當需要建立的子程序數量不多時, 234 可以直接利用multiprocessing中的Process動態生成多個程序, 235 但是如果是上百個甚至上千個目標,手動的去建立程序工作量巨大, 236 此時就可以用multiprocessing模組提供的Pool方法。 237 238 ----------------------- 239 3.2初始化Pool時,可以指定一個最大程序數, 240 當有新的請求提交到Pool中時, 241 如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求; 242 但如果池中的程序數已經達到指定最大值,那麼該請求就會等待, 243 直到池中有程序結束,才會建立新的程序來執行 244 245 ----------------------- 246 3.3 比較 247 apply(): 248 apply是阻塞的。首先主程序開始執行,碰到子程序,作業系統切換到子程序, 249 等待子程序執行結束後,在切換到另外一個子程序,直到所有子程序執行完畢。 250 然後在切換到主程序,執行剩餘的部分。 251 這樣跟單程序序列執行沒什麼區別,子程序是順序執行的,且子程序全部執行完畢後才繼續執行主程序。 252 apply_async(): # 建議使用此程序池方式建立例項 253 apply_async 是非同步非阻塞的。 254 即不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換。 255 首先主程序開始執行,碰到子程序後,主程序仍可以先執行, 256 等到作業系統進行程序切換的時候,在交給子程序執行。 257 可以做到不等待子程序執行完畢,主程序就已經執行完畢,並退出程式。 258 259 ----------------------- 260 此程序池作為主要建立程序的方式 261 ----------------------- 262 263 -------------------------------------------- 264 4程序間通訊-Queue 265 266 ----------------------- 267 4.1 Queue的使用 268 可以使用multiprocessing模組的Queue實現多程序之間的資料傳遞, 269 Queue本身是一個訊息佇列程式, 270 271 》》佇列:先進先出 棧:先進後出 272 273 說明: 274 初始化Queue()物件時(q=Queue) 275 若括號中沒有指定最大可接受的訊息數量或者數量為負值, 276 那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭) 277 Queue.qsize():返回當前佇列包含的訊息數量。 278 Queue.empty():如果佇列為空,返回True 279 Queue.full() :如果佇列滿了,返回True 280 281 Queue.get([block [, timeout]]):獲取佇列中的一條訊息,然後將其從佇列移除,block預設值為True。 282 1)如果block使用預設值,且沒有設定timeout(),訊息佇列如果為空, 283 此時程式將被阻塞(停在讀取狀態),直到從訊息佇列讀到訊息為止。 284 如果設定了timeout,則會等待timeout秒,若還沒有讀取到任何訊息, 285 則丟擲“Queue.Empty”異常。 286 2)如果block值為False,訊息佇列如果為空,則會立刻丟擲“Queue.Empty”異常。 287 Queue.get_nowait(): 相當於Queue.get(False) 288 289 Queue.put(item,[block [, timeout]]):將item訊息寫入佇列,block預設值為True 290 1)如果block使用預設值,且沒有設定timeout(),訊息佇列如果已經沒有空間可寫入, 291 此時程式將被阻塞(停在寫入狀態),直到從訊息佇列騰出空間為止。 292 如果設定了timeout,則會等待timeout秒,若還沒有空間, 293 則丟擲“Queue.Full”異常。 294 2)如果block值為False,訊息佇列如果沒有空間寫入,則會立刻丟擲“Queue.Full”異常。 295 Queue.put_nowait(item): 相當於Queue.put(item,False) 296 297 ----------------------- 298 4.2 程序池中的Queue 299 如果要是有Pool建立程序,就要使用multiprocess.manager()的Queue()。 300 而不是multiprocess.Queue(),否則會得到錯誤訊息。 301 302 ----------------------- 303 304 305 ------------------------------------------ 306 307 ------------------------------------------------------- 308 >>>> 重要 309 ------------------------------------------------------- 310 311 2.Process 建立程序 312 2.1例程 程序 313 314 # 1.系統庫 315 import sys 316 import os 317 import time 318 import random 319 320 #2.第三方庫 321 from multiprocessing import Process 322 from multiprocessing import Pool 323 from multiprocessing import Queue 324 325 ----------------------------- 326 327 單程序 示例 328 329 # 任務1 330 def test(): 331 while True: 332 print("--- test function ---") 333 time.sleep(1) 334 335 if __name__ == "__main__": 336 pool = Process(target=test) # 例項物件 337 pool.start() # 讓這個程序開始執行 338 # test函式裡的程式碼 339 # 主程序 340 while True: 341 print("---main---") 342 time.sleep(1) 343 344 ----------------------------- 345 多程序 346 347 def Function_Test_A(): 348 print("-A1") 349 while True: 350 print("-Function_Test_A---") 351 print("-A2") 352 time.sleep(1) 353 print("-A3") 354 355 def Function_Test_B(): 356 print("--B1") 357 while True: 358 print("--Function_Test_B---") 359 print("--B2") 360 time.sleep(2) 361 print("--B3") 362 363 def Function_Test_C(): 364 print("---C1") 365 while True: 366 print("---Function_Test_C---") 367 print("---C2") 368 time.sleep(4) 369 print("---C3") 370 371 372 if __name__ == "__main__": 373 374 pool_A = Process(target=Function_Test_A) 375 pool_A.start() # 讓這個程序開始執行例項函式裡的程式碼 376 pool_B = Process(target=Function_Test_B) 377 pool_B.start() # 讓這個程序開始執行例項函式裡的程式碼 378 pool_C = Process(target=Function_Test_C) 379 pool_C.start() # 讓這個程序開始執行例項函式裡的程式碼 380 381 var = 1000 382 print("M1") 383 # 主程序 384 while True: 385 print("Main=============") 386 print("M2") 387 print(var) 388 var += 1 389 print("M3") 390 time.sleep(0.5) 391 print("M4") 392 393 說明: 394 主程序等待Process子程序先結束,即主程序最後結束.(註釋主程序,可以體現此現象) 395 396 ------------------------- 397 398 等待join() 等待超時時間 399 400 def Function_Test_D(): 401 print("----D1") 402 print("----Function_Test_D----") 403 print("----D2") 404 for i in range(random.randint(1, 5)): 405 print("----D = [%d]" % i) 406 print("----D3") 407 time.sleep(0.1) 408 print("----D4") 409 print("----D5") 410 411 if __name__ == "__main__": 412 pool_D = Process(target=Function_Test_D) 413 pool_D.start() # 讓這個程序開始執行例項函式裡的程式碼 414 pool_D.join() # 等待此物件例項子程序結束為止 415 416 print("main") 417 418 說明:pool.join() #堵塞 子程序結束後才能執行 print("main") 419 ------------------------- 420 421 繼承Process類 建立程序 422 423 class Process_Class(Process): 424 # 因為Process類本身也有__init__方法,這個子類相當於重新寫了這個方法。 425 # 但是這樣帶來新的問題,我們並沒有完全的初始化一個Process類, 426 # 所有就不建議使用。 427 # 最好的方法就是將繼承本身傳遞給Process.__init__方法, 428 # 完成這些初始化。 429 430 def __init__(self, interval): 431 Process.__init__(self) 432 self.interval = interval 433 434 # 重新寫了Process類的方法run()方法 435 def run(self): 436 print("子程序(%s)開始執行,父程序(%s)" % (os.getpid(), os.getppid())) 437 t_start = time.time() 438 time.sleep(self.interval) 439 t_stop = time.time() 440 print("(%s)執行結束,耗時%0.2f秒" % (os.getpid(), (t_stop - t_start))) 441 442 if __name__ == "__main__": 443 t_start = time.time() 444 print("當前程式程序(%s)" % (os.getpid())) 445 pool_E = Process_Class(2) 446 # target: 表示這個程序例項所呼叫的物件 447 # start(): 啟動程序例項(建立子程序) 448 # run(): 如果沒有給定target引數,對這個物件呼叫start()方法時, 449 # 就將執行物件中的run()方法 450 # terminate(): 不管任務是否完成,立即終止。 451 452 #對一個不包含target屬性的Process類執行start()就會執行這個類中run() 453 # 454 pool_E.start() 455 pool_E.join() 456 t_stop = time.time() 457 print("(%s)執行結束,耗時%0.2f秒" % (os.getpid(), (t_stop - t_start))) 458 459 result: 460 當前程式程序(24264) 461 子程序(24296)開始執行,父程序(24264) 462 (24296)執行結束,耗時2.00秒 463 (24264)執行結束,耗時2.23秒 464 465 說明:類物件建立程序,基於上述建立,只是時間不同。 466 interval = 2s 間隔時間 2秒 467 468 ------------------------- 469 470 程序池 471 472 # 程序池任務 473 def Function_Test_F(num): 474 # random.random()隨機生成0-1之間的浮點數 475 for i in range(2): 476 print("===pid=%d==num=%d="%(os.getpid(), num)) 477 time.sleep(1) 478 479 if __name__ == "__main__": 480 # 定義一個程序池,最大程序數 481 # 3表示:程序池中最多有3個程序一起執行 482 pool_name = Pool(3) 483 484 for i in range(5): 485 print("---%d---"%i) 486 #向程序池中新增任務 487 #注意:如果新增的任務數量,超過了程序池中程序的最大個數的話, 488 # 那麼不會導致新增不進入,新增到程序中的任務. 489 # 如果還沒有被執行的話,那麼此時他們會等待程序池中的, 490 # 程序完成一個任務之後,會自動的去用剛剛的那個程序, 491 # 完成當前的新任務 492 #pool_F.apply_async(要呼叫的目標,(傳遞給目標的引數()元組,)) 493 pool_name.apply_async(Function_Test_F, (i,)) 494 495 # 關閉程序池,相當於不能夠再次新增新任務了,pool_F不再接收新的請求 496 pool_name.close() 497 #等待pool_F中所有子程序執行完成,必須放在close語句之後 498 pool_name.join() 499 # 主程序 建立/新增 任務後,主程序預設不會等待程序池中的任務執行完後才結束 500 # 而是當主程序的任務做完之後 立馬結束, 501 # 如果這個地方沒join,會導致程序池中的任務不會執行 502 503 result: 504 ---0--- 505 ---1--- 506 ---2--- 507 ---3--- 508 ---4--- 509 ===pid=25644==num=0= 510 ===pid=26392==num=1= 511 ===pid=25992==num=2= 512 ===pid=25644==num=0= 513 ===pid=26392==num=1= 514 ===pid=25992==num=2= 515 ===pid=25644==num=3= 516 ===pid=26392==num=4= 517 ===pid=25644==num=3= 518 ===pid=26392==num=4= 519 520 說明:可以做建立程序標準模式 521 規定一定數量程序後,不再考慮程序堵塞問題 522 523 此程序池作為主要建立程序的方式 524 525 ------------------------- 526 527 程序間通訊 528 529 if __name__ == "__main__": 530 # 初始化一個Queue物件,最多可接受三條put訊息 531 queue = Queue(3) 532 queue.put("message 1 ") 533 queue.put("message 2 ") 534 print(queue.full()) # False 535 queue.put("message 3 ") 536 print(queue.full()) # True 537 538 # 因為訊息佇列已滿,下面的try都會丟擲異常, 539 # 第一個等待2秒再丟擲異常 540 # 第二個等待4秒再丟擲異常 541 try: 542 queue.put("message 4 ",True,2) 543 except Exception: 544 print("message full,current number: [%s]" % (queue.qsize())) 545 546 try: 547 queue.put("message 5 ",True,2) 548 except Exception: 549 print("message full,current number: [%s]" % (queue.qsize())) 550 551 # 推薦的方式,先判斷訊息佇列是否已滿,在寫入 552 if not queue.full(): 553 queue.put("message 6 ") 554 555 # 讀取訊息時,先判斷訊息佇列是否為空,在讀取 556 if not queue.empty(): 557 for var in range(queue.qsize()): 558 print(queue.get_nowait()) 559 560 result: 561 False 562 True 563 message full,current number: [3] 564 message full,current number: [3] 565 message 1 566 message 2 567 message 3 568 569 ------------------------- 570 571 Queue 程序間通訊 讀寫資料 572 573 # 寫資料程序執行的程式碼 574 def Function_Write(queue_name_data): 575 local_var_queue_data = queue_name_data 576 for value in ["A", "B", "C", "D"]: 577 print("put [%s] to queue " % (value)) 578 local_var_queue_data.put(value) 579 time.sleep(random.random()) 580 581 # 讀取資料程序執行的程式碼: 582 def Function_Read(queue_name_data): 583 local_var_queue_data = queue_name_data 584 while True: 585 if not local_var_queue_data.empty(): 586 value = local_var_queue_data.get(True) 587 print("get [%s] from queue " % (value)) 588 time.sleep(random.random()) 589 else: 590 break 591 592 if __name__ == "__main__": 593 594 # 父程序建立Queue,並傳給各個子程序: 例項 Queue讀寫資料 595 queue_name = Queue() 596 proc_write = Process(target = Function_Write, args = (queue_name,)) 597 proc_read = Process(target = Function_Read, args = (queue_name,)) 598 599 proc_write.start() # 啟動子程序pw,寫入 600 proc_write.join() # 等待pw結束 601 proc_read.start() # 啟動子程序pr,寫入 602 proc_read.join() # 等待pr結束 603 604 # pr程序是死迴圈,無法等待其結束,只能強行終止 605 print("...") 606 print("所有資料都寫入並且讀完") 607 608 result: 609 put [A] to queue 610 put [B] to queue 611 put [C] to queue 612 put [D] to queue 613 get [A] from queue 614 get [B] from queue 615 get [C] from queue 616 get [D] from queue 617 ... 618 所有資料都寫入並且讀完 619 620 ------------------------- 621 # 程序池中通訊 622 def Function_reader(queue_name_data): 623 local_var_queue_data = queue_name_data 624 print(" reader 啟動(%s),父程序為(%s)" % (os.getpid(),os.getppid())) 625 for i in range(local_var_queue_data.qsize()): 626 print(" Function_reader 從Queue獲取到訊息: %s " % (local_var_queue_data.get(True))) 627 628 629 def Function_writer(queue_name_data): 630 local_var_queue_data = queue_name_data 631 print(" writer 啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid())) 632 for var in "dongGe": 633 local_var_queue_data.put(var) 634 635 if __name__ == "__main__": 636 # 程序池中Queue 通訊 637 print("(%s) start" % (os.getpid())) 638 queue_name = Manager().Queue() # 使用Manager中的Queue來初始化 639 pool_name = Pool() 640 # 使用阻塞模式建立程序,這樣就不需要再reader中使用死迴圈 641 pool_name.apply(Function_writer,(queue_name,)) 642 pool_name.apply(Function_reader,(queue_name,)) 643 pool_name.close() 644 pool_name.join() 645 print("(%s) end" % (os.getpid())) 646 647 result: 648 (36900) start 649 writer 啟動(37760),父程序為(36900) 650 reader 啟動(37020),父程序為(36900) 651 Function_reader 從Queue獲取到訊息: d 652 Function_reader 從Queue獲取到訊息: o 653 Function_reader 從Queue獲取到訊息: n 654 Function_reader 從Queue獲取到訊息: g 655 Function_reader 從Queue獲取到訊息: G 656 Function_reader 從Queue獲取到訊息: e 657 (36900) end 658 659 ------------------------- 660 661 ====================================================== 662 663 執行緒 664 665 1.如果多個執行緒執行的都是同一個函式的話, 666 各自之間互不影響,各自執行 667 668 669 670 671 672 ====================================================== 673 ''' 674 675 676 ''' 677 # ============================================================================ 678 # Function: 679 # Explain : 輸入引數 680 # : 輸出引數 681 # ============================================================================ 682 ''' 683 684 #------------------------- 685 # 單程序任務 686 def Function_Test_A(): 687 print("-A1") 688 while True: 689 print("-Function_Test_A---") 690 print("-A2") 691 time.sleep(1) 692 print("-A3") 693 694 #------------------------- 695 # 多程序任務 696 def Function_Test_B(): 697 print("--B1") 698 while True: 699 print("--Function_Test_B---") 700 print("--B2") 701 time.sleep(2) 702 print("--B3") 703 704 #------------------------- 705 # 多程序任務 706 def Function_Test_C(): 707 print("---C1") 708 while True: 709 print("---Function_Test_C---") 710 print("---C2") 711 time.sleep(4) 712 print("---C3") 713 714 #------------------------- 715 # 多程序任務,join() 716 def Function_Test_D(): 717 print("----D1") 718 print("----Function_Test_D----") 719 print("----D2") 720 for i in range(random.randint(1, 5)): 721 print("----D = [%d]" % i) 722 print("----D3") 723 time.sleep(0.1) 724 print("----D4") 725 print("----D5") 726 727 #------------------------- 728 # 繼承Process類 新建程序 729 class Process_Class(Process): 730 # 因為Process類本身也有__init__方法,這個子類相當於重新寫了這個方法。 731 # 但是這樣帶來新的問題,我們並沒有完全的初始化一個Process類, 732 # 所有就不建議使用。 733 # 最好的方法就是將繼承本身傳遞給Process.__init__方法, 734 # 完成這些初始化。 735 736 def __init__(self, interval): 737 Process.__init__(self) 738 self.interval = interval 739 740 # 重新寫了Process類的方法run()方法 741 # start()呼叫run()方法 742 def run(self): 743 print("子程序(%s)開始執行,父程序(%s)" % (os.getpid(), os.getppid())) 744 t_start = time.time() 745 time.sleep(self.interval) 746 t_stop = time.time() 747 print("(%s)執行結束,耗時%0.2f秒" % (os.getpid(), (t_stop - t_start))) 748 749 #------------------------- 750 # 程序池任務 751 def Function_Test_F(num): 752 # random.random()隨機生成0-1之間的浮點數 753 for i in range(2): 754 print("===pid=%d==num=%d="%(os.getpid(), num)) 755 time.sleep(1) 756 757 #------------------------- 758 # 寫資料程序執行的程式碼 759 def Function_Write(queue_name_data): 760 local_var_queue_data = queue_name_data 761 for value in ["A", "B", "C", "D"]: 762 print("put [%s] to queue " % (value)) 763 local_var_queue_data.put(value) 764 time.sleep(random.random()) 765 766 # 讀取資料程序執行的程式碼: 767 def Function_Read(queue_name_data): 768 local_var_queue_data = queue_name_data 769 while True: 770 if not local_var_queue_data.empty(): 771 value = local_var_queue_data.get(True) 772 print("get [%s] from queue " % (value)) 773 time.sleep(random.random()) 774 else: 775 break 776 #------------------------- 777 # 程序池中通訊 778 def Function_reader(queue_name_data): 779 local_var_queue_data = queue_name_data 780 print(" reader 啟動(%s),父程序為(%s)" % (os.getpid(),os.getppid())) 781 for i in range(local_var_queue_data.qsize()): 782 print(" Function_reader 從Queue獲取到訊息: %s " % (local_var_queue_data.get(True))) 783 784 785 def Function_writer(queue_name_data): 786 local_var_queue_data = queue_name_data 787 print(" writer 啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid())) 788 for var in "dongGe": 789 local_var_queue_data.put(var) 790 791 792 #------------------------- 793 794 # ============================================================================ 795 ''' 796 # ============================================================================ 797 # 測試專用 798 # ============================================================================ 799 ''' 800 if __name__ == "__main__": 801 802 # #------------------------- 803 # # 單/多程序 804 # pool_A = Process(target=Function_Test_A) 805 # pool_A.start() # 讓這個程序開始執行例項函式裡的程式碼 806 # pool_B = Process(target=Function_Test_B) 807 # pool_B.start() # 讓這個程序開始執行例項函式裡的程式碼 808 # pool_C = Process(target=Function_Test_C) 809 # pool_C.start() # 讓這個程序開始執行例項函式裡的程式碼 810 # var = 1000 811 # print("M1") 812 # while True: 813 # print("Main=============") 814 # print("M2") 815 # print(var) 816 # var += 1 817 # print("M3") 818 # time.sleep(0.5) 819 # print("M4") 820 821 822 # #------------------------- 823 # #新增join() 824 # pool_D = Process(target=Function_Test_D) 825 # pool_D.start() # 讓這個程序開始執行例項函式裡的程式碼 826 # pool_D.join() # 等待此物件例項子程序結束為止 827 # print("main") 828 829 830 # #------------------------- 831 # #繼承Process類 新建程序 832 # t_start = time.time() 833 # print("當前程式程序(%s)" % (os.getpid())) 834 # # interval = 2s 間隔時間 2秒 835 # pool_E = Process_Class(2) 836 # # target : 表示這個程序例項所呼叫的物件 837 # # start(): 啟動程序例項(建立子程序) 838 # # run() : 如果沒有給定target引數,對這個物件呼叫start()方法時, 839 # # 就將執行物件中的run()方法 840 # # terminate(): 不管任務是否完成,立即終止。 841 # # 對一個不包含target屬性的Process類執行start()就會執行這個類中run() 842 # pool_E.start() # start()呼叫run()方法 843 # pool_E.join() 844 # t_stop = time.time() 845 # print("(%s)執行結束,耗時%0.2f秒" % (os.getpid(), (t_stop - t_start))) 846 847 848 # #------------------------- 849 # # 程序池 850 # # 定義一個程序池,最大程序數 851 # # 3表示:程序池中最多有3個程序一起執行 852 # pool_name = Pool(3) 853 # 854 # for i in range(5): 855 # print("---%d---"%i) 856 # #向程序池中新增任務 857 # #注意:如果新增的任務數量,超過了程序池中程序的最大個數的話, 858 # # 那麼不會導致新增不進入,新增到程序中的任務. 859 # # 如果還沒有被執行的話,那麼此時他們會等待程序池中的, 860 # # 程序完成一個任務之後,會自動的去用剛剛的那個程序, 861 # # 完成當前的新任務 862 # #pool_F.apply_async(要呼叫的目標,(傳遞給目標的引數()元組,)) 863 # pool_name.apply_async(Function_Test_F, (i,)) 864 # 865 # # 關閉程序池,相當於不能夠再次新增新任務了,pool_F不再接收新的請求 866 # pool_name.close() 867 # #等待pool_F中所有子程序執行完成,必須放在close語句之後 868 # pool_name.join() 869 # # 主程序 建立/新增 任務後,主程序預設不會等待程序池中的任務執行完後才結束 870 # # 而是當主程序的任務做完之後 立馬結束, 871 # # 如果這個地方沒join,會導致程序池中的任務不會執行 872 873 874 875 # # #------------------------- 876 # # 初始化一個Queue物件,最多可接受三條put訊息 877 # queue = Queue(4) 878 # queue.put("message 1 ") 879 # queue.put("message 2 ") 880 # print(queue.full()) # False 881 # queue.put("message 3 ") 882 # print(queue.full()) # True 883 # 884 # # 因為訊息佇列已滿,下面的try都會丟擲異常, 885 # # 第一個等待2秒再丟擲異常 886 # # 第二個等待4秒再丟擲異常 887 # try: 888 # queue.put("message 4 ",True,2) 889 # except Exception: 890 # print("message full,current number: [%s]" % (queue.qsize())) 891 # 892 # try: 893 # queue.put("message 5 ",True,2) 894 # except Exception: 895 # print("message full,current number: [%s]" % (queue.qsize())) 896 # 897 # # 推薦的方式,先判斷訊息佇列是否已滿,在寫入 898 # if not queue.full(): 899 # queue.put("message 6 ") 900 # 901 # # 讀取訊息時,先判斷訊息佇列是否為空,在讀取 902 # if not queue.empty(): 903 # print(queue.qsize()) 904 # for var in range(queue.qsize()): 905 # print(queue.get_nowait()) 906 # 907 908 909 # # #------------------------- 910 # # 父程序建立Queue,並傳給各個子程序: 例項 Queue讀寫資料 911 # queue_name = Queue() 912 # proc_write = Process(target = Function_Write, args = (queue_name,)) 913 # proc_read = Process(target = Function_Read, args = (queue_name,)) 914 # 915 # proc_write.start() # 啟動子程序pw,寫入 916 # proc_write.join() # 等待pw結束 917 # proc_read.start() # 啟動子程序pr,寫入 918 # proc_read.join() # 等待pr結束 919 # 920 # # pr程序是死迴圈,無法等待其結束,只能強行終止 921 # print("...") 922 # print("所有資料都寫入並且讀完") 923 924 925 # # #------------------------- 926 # # 程序池中Queue 通訊 927 # print("(%s) start" % (os.getpid())) 928 # queue_name = Manager().Queue() # 使用Manager中的Queue來初始化 929 # pool_name = Pool() 930 # # 使用阻塞模式建立程序,這樣就不需要再reader中使用死迴圈 931 # pool_name.apply(Function_writer,(queue_name,)) 932 # pool_name.apply(Function_reader,(queue_name,)) 933 # pool_name.close() 934 # pool_name.join() 935 # print("(%s) end" % (os.getpid())) 936 937 938 print(" learn finish") 939 940 941
--
finish
--------