python分散式版本二分法驗證
阿新 • • 發佈:2021-08-17
- 思路1,master等待遠端返回結果
from multiprocessing import Pool import os, time, random # 在遠端執行的任務程序,,hang住等待返回 def long_time_task(ip,url): print('Run task %s-%s (%s)...' % (ip, url, os.getpid())) start = time.time() time.sleep(random.random() * 30) end = time.time() print('Task %s runs %0.2f seconds.' % (ip, (end - start))) return True # 版本驗證通過返回true if __name__=='__main__': print('Parent process %s.' % os.getpid()) url_list = [1,2,3,4,5,6,7,8] # 版本list,stat[0],end[-1] p = Pool(4) # 遠端執行的節點數 # 根據節點數二分法取出節點數個版本待驗證 for i in range(5): p.apply_async(long_time_task, args=(i,'url:1')) # 傳入ip地址,和版本url print('Waiting for all subprocesses done...') p.close() p.join() # 所有程序都完成後,根據返回結果更新二分法的start和end print('All subprocesses done.')
- 思路2,QueueManager通訊
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import random, time, queue from multiprocessing.managers import BaseManager # 傳送任務的佇列: task_queue = queue.Queue() # 接收結果的佇列: result_queue = queue.Queue() # 增加如下的函式以適應windows ,【PicklingError】 def task_queue_fun(): return task_queue def result_queue_fun(): return result_queue # return queue.Queue() # 這樣不行 # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass if __name__ == '__main__': # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件: # 修改如下的函式,以適應windows環境 # QueueManager.register('get_task_queue', callable=lambda: task_queue) # QueueManager.register('get_result_queue', callable=lambda: result_queue) QueueManager.register('get_task_queue', callable=task_queue_fun) QueueManager.register('get_result_queue', callable=result_queue_fun) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')