1. 程式人生 > >python分散式程序

python分散式程序

 

轉自廖雪峰的python教程,因為是在windows上程式設計,所以程式碼會和原文有一些出入。

master.py

from multiprocessing.managers import BaseManager
import random, time, queue

task_queue = queue.Queue() #任務佇列
result_queue = queue.Queue()    #結果佇列

class MyManager(BaseManager):
    pass

def return_task_queue():
    global task_queue
    return task_queue
def return_result_queue():
    global result_queue
    return result_queue

if __name__ == '__main__':
    #釋出到網路上
    MyManager.register('tasks', callable= return_task_queue)
    MyManager.register('results', callable= return_result_queue)

    manager = MyManager(address=('127.0.0.1',5000), authkey=b'abc')  #在本機5000埠上建立一個Manager 口令為abc
    manager.start()

    task = manager.tasks()          #不能直接去操作上面的task_queue了,需要通過網路獲取
    result = manager.results()

    for i in range(1,10):
        print('釋出任務,%d' % i)
        task.put(i)

    for i in range(1,10):
        r = result.get(timeout=10)
        print('結果',r)

    manager.shutdown()

worker.py

from multiprocessing.managers import BaseManager
import random, time, queue

class MyManager(BaseManager):
    pass

MyManager.register('tasks')         #獲取master釋出的任務
MyManager.register('results')

manager = MyManager(address=('127.0.0.1', 5000), authkey=b'abc')
manager.connect() #連線到master去

task = manager.tasks()
result = manager.results()

for i in range(1,10):
    try:
        n = task.get(timeout=1)
        print('執行任務,',n)
        n = n * n
        result.put(n)
    except queue.Empty:
        print('task queue is Empty')

先執行master.py  在master的命令列就會顯示釋出任務,然後等著worker.py處理返回結果

Output:
釋出任務,1
釋出任務,2
釋出任務,3
釋出任務,4
釋出任務,5
釋出任務,6
釋出任務,7
釋出任務,8
釋出任務,9

再執行worker.py 在worker的命令列會顯示以下輸出。

Output:
執行任務, 1
執行任務, 2
執行任務, 3
執行任務, 4
執行任務, 5
執行任務, 6
執行任務, 7
執行任務, 8
執行任務, 9

然後master.py也能繼續往下走,獲取計算結果了。 

Output:
釋出任務,1
釋出任務,2
釋出任務,3
釋出任務,4
釋出任務,5
釋出任務,6
釋出任務,7
釋出任務,8
釋出任務,9
結果 1
結果 4
結果 9
結果 16
結果 25
結果 36
結果 49
結果 64
結果 81