1. 程式人生 > >python日常筆記3--關於分散式任務的分配

python日常筆記3--關於分散式任務的分配

分散式任務分配,目前流行的就是master-worker型,也就是有個master程序(或執行緒)來分配任務,其餘的worker程序(或執行緒)來處理任務。我們一般都是採取多程序分散式處理任務,因為多程序比多執行緒要更加的穩定(起碼在python中是這樣)。

首先寫master.py

import queue, random, time
from multiprocessing.managers import  BaseManager

class QueueManager(BaseManager):
    pass




task_queue = queue.Queue()
result_queu = queue.Queue()
res_info = False

QueueManager.register('task_queue', callable=lambda :task_queue)
QueueManager.register('result_queue', callable=lambda :result_queu)
QueueManager.register('res_info', callable=lambda :res_info)

manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
manager.start()

task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for i in range(10):
    n =random.randint(0,1000)
    print('put %s into the task queue' % n)
    task.put(n)


print('waiting for an task to deal with the data...')

for v in range(10):
    r = result.get(timeout=100)
    print('the result is : %s',r)


manager.shutdown()
print('The task process has exited')

我們通過讓繼承BaseManager生成一個我們自己的類QueueManger,用來處理實現分散式伺服器之間的網路通訊。自己需要先定義各個變數,比如這裡的task_queue(任務佇列),然後在QueueManager中註冊,就相當於是寫一個介面函式,方便網路通訊的雙方可以正確接受資料。

之後即使通過QueueMagner例項化一個manager, 這裡需要些必要的引數,ip地址,這裡由於我是本機測試,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到伺服器上,ip地址最好為空'',這樣worker端訪問的ip地址就是伺服器端ip地址。還有一個authkey, 是用來作為密令使用的, 防止第三方惡意破壞程式.

然後就是start, 之後通過manager獲取一些資料,並進行傳遞。

worker.py

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

QueueManager.register('task_queue')
QueueManager.register('result_queue')
QueueManager.register('res_info')

manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
print('going to connect...')
manager.connect()
print('connected....')
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()

for value in range(10):
    value = task.get(timeout=100)
    k = value * value
    print('finish the tast')
    result.put('the result is %s' % k)

print('son task finished')
info = True

跟上面一樣,例項化出manger時候, 呼叫manager.connect() 就完成了連線工作