1. 程式人生 > >分散式程序--錯誤解決

分散式程序--錯誤解決

PicklingError:不能pickle <函式在0x02747DB0>:沒有找到main

解決連結

task_master.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random, time, queue
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager): pass def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue def test(): # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件: # QueueManager.register('get_task_queue', callable=lambda: task_queue)
# QueueManager.register('get_result_queue', callable=lambda: result_queue) QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('127.0.0.1', 5000
), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.') if __name__ == '__main__': freeze_support() test()

task_worker.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time, sys, queue
from multiprocessing.managers import BaseManager

# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連線到伺服器,也就是執行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與task_master.py設定的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,並把結果寫入result佇列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

同時開啟兩個shell。
先執行task_master.py
然後執行task_worker.py。
(間隔時間長會出現Empty Queue)

C:\Users\K\Desktop\x>python task_master.py
Put task 8323...
Put task 2381...
Put task 3692...
Put task 5614...
Put task 8008...
Put task 3851...
Put task 8920...
Put task 4314...
Put task 5676...
Put task 4413...
Try get results...
Result: 8323 * 8323 = 69272329
Result: 2381 * 2381 = 5669161
Result: 3692 * 3692 = 13630864
Result: 5614 * 5614 = 31516996
Result: 8008 * 8008 = 64128064
Result: 3851 * 3851 = 14830201
Result: 8920 * 8920 = 79566400
Result: 4314 * 4314 = 18610596
Result: 5676 * 5676 = 32216976
Result: 4413 * 4413 = 19474569
master exit.

C:\Users\K\Desktop\x>
C:\Users\K\Desktop\x>python task_worker.py
Connect to server 127.0.0.1...
run task 8323 * 8323...
run task 2381 * 2381...
run task 3692 * 3692...
run task 5614 * 5614...
run task 8008 * 8008...
run task 3851 * 3851...
run task 8920 * 8920...
run task 4314 * 4314...
run task 5676 * 5676...
run task 4413 * 4413...
worker exit.

C:\Users\K\Desktop\x>