1. 程式人生 > >python3 分散式程序(跨機器)BaseManager(multiprocessing.managers)

python3 分散式程序(跨機器)BaseManager(multiprocessing.managers)

A機器負責傳送任務和接受結果:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #task_master.py import  random,time,queue from  multiprocessing.managers  import  BaseManager   task_queue 
=  queue.Queue() result_queue  =  queue.Queue()   class  QueueManager(BaseManager):      pass   if
  __name__  = =  '__main__' :      print ( "master start." )      QueueManager.register( 'get_task_queue' , callable  =  lambda :task_queue)      QueueManager.register( 'get_result_queue' , callable  =  lambda :result_queue)      manager  =  QueueManager(address  =  ( '10.10.100.11' , 9833 ),authkey = b 'abc' )      manager.start()      task  =  manager.get_task_queue()      result  =  manager.get_result_queue()        for  in  range ( 10 ):          =  random.randint( 0 , 1000 )          print ( 'put task %d ...'  %  n)          task.put(n)      print ( 'try get results...' )        for  in  range ( 10 ):          =  result.get(timeout  =  100 )          print ( 'Result:%s'  %  r)      manager.shutdown()      print ( 'master exit.' )

B機器負責處理任務和傳送結果:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 #task_worker.py import  sys,time,queue from  multiprocessing.managers  import  BaseManager   class  QueueManager(BaseManager):      pass   QueueManager.register( 'get_task_queue' ) QueueManager.register( 'get_result_queue' )   server_addr  =  '10.10.100.11' print ( 'connect to server %s...'  %  server_addr)   =  QueueManager(address = (server_addr, 9833 ),authkey = b 'abc' ) m.connect()   task  =  m.get_task_queue() result  =  m.get_result_queue()   for  in  range ( 10 ):      try :          =  task.get(timeout  =  10 )          print ( 'run task %d * %d'  % (n,n))          =  '%d * %d = %d'  % (n,n,n * n)          time.sleep( 1 )          result.put(r)      except  Queue.Empty:          print ( 'task queue is empty' )   print ( 'worker exit'