python3 分散式程序(跨機器)BaseManager(multiprocessing.managers)
阿新 • • 發佈:2018-12-27
A機器負責傳送任務和接受結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#task_master.py
import
random,time,queue
from
multiprocessing.managers
import
BaseManager
task_queue =
queue.Queue()
result_queue
=
queue.Queue()
class
QueueManager(BaseManager):
pass
if __name__
=
=
'__main__'
:
print
(
"master start."
)
QueueManager.register(
'get_task_queue'
,
callable
=
lambda
:task_queue)
QueueManager.register(
'get_result_queue'
,
callable
=
lambda
:result_queue)
manager
=
QueueManager(address
=
(
'10.10.100.11'
,
9833
),authkey
=
b
'abc'
)
manager.start()
task
=
manager.get_task_queue()
result
=
manager.get_result_queue()
for
i
in
range
(
10
):
n
=
random.randint(
0
,
1000
)
print
(
'put task %d ...'
%
n)
task.put(n)
print
(
'try get results...'
)
for
i
in
range
(
10
):
r
=
result.get(timeout
=
100
)
print
(
'Result:%s'
%
r)
manager.shutdown()
print
(
'master exit.'
)
|
B機器負責處理任務和傳送結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
#task_worker.py
import
sys,time,queue
from
multiprocessing.managers
import
BaseManager
class
QueueManager(BaseManager):
pass
QueueManager.register(
'get_task_queue'
)
QueueManager.register(
'get_result_queue'
)
server_addr
=
'10.10.100.11'
print
(
'connect to server %s...'
%
server_addr)
m
=
QueueManager(address
=
(server_addr,
9833
),authkey
=
b
'abc'
)
m.connect()
task
=
m.get_task_queue()
result
=
m.get_result_queue()
for
i
in
range
(
10
):
try
:
n
=
task.get(timeout
=
10
)
print
(
'run task %d * %d'
%
(n,n))
r
=
'%d * %d = %d'
%
(n,n,n
*
n)
time.sleep(
1
)
result.put(r)
except
Queue.Empty:
print
(
'task queue is empty'
)
print
(
'worker exit'
)
|