python日常筆記3--關於分散式任務的分配
阿新 • • 發佈:2019-01-23
分散式任務分配,目前流行的就是master-worker型,也就是有個master程序(或執行緒)來分配任務,其餘的worker程序(或執行緒)來處理任務。我們一般都是採取多程序分散式處理任務,因為多程序比多執行緒要更加的穩定(起碼在python中是這樣)。
首先寫master.py
import queue, random, time from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass task_queue = queue.Queue() result_queu = queue.Queue() res_info = False QueueManager.register('task_queue', callable=lambda :task_queue) QueueManager.register('result_queue', callable=lambda :result_queu) QueueManager.register('res_info', callable=lambda :res_info) manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc') manager.start() task = manager.task_queue() result = manager.result_queue() info = manager.res_info() for i in range(10): n =random.randint(0,1000) print('put %s into the task queue' % n) task.put(n) print('waiting for an task to deal with the data...') for v in range(10): r = result.get(timeout=100) print('the result is : %s',r) manager.shutdown() print('The task process has exited')
我們通過讓繼承BaseManager生成一個我們自己的類QueueManger,用來處理實現分散式伺服器之間的網路通訊。自己需要先定義各個變數,比如這裡的task_queue(任務佇列),然後在QueueManager中註冊,就相當於是寫一個介面函式,方便網路通訊的雙方可以正確接受資料。
之後即使通過QueueMagner例項化一個manager, 這裡需要些必要的引數,ip地址,這裡由於我是本機測試,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到伺服器上,ip地址最好為空'',這樣worker端訪問的ip地址就是伺服器端ip地址。還有一個authkey, 是用來作為密令使用的, 防止第三方惡意破壞程式.
然後就是start, 之後通過manager獲取一些資料,並進行傳遞。
worker.py
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('task_queue') QueueManager.register('result_queue') QueueManager.register('res_info') manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc') print('going to connect...') manager.connect() print('connected....') task = manager.task_queue() result = manager.result_queue() info = manager.res_info() for value in range(10): value = task.get(timeout=100) k = value * value print('finish the tast') result.put('the result is %s' % k) print('son task finished') info = True
跟上面一樣,例項化出manger時候, 呼叫manager.connect() 就完成了連線工作