1. 程式人生 > >python 實現分散式程序

python 實現分散式程序

前言

本來在學習廖雪峰老師的分散式程序模組,但是使用其程式碼執行,發現了各種報錯,通過自己的解決能順利跑通,現跟大家分享一下。

本文是通過學習廖雪峰老師的教程進行的記錄,僅用於個人學習記錄,請大家支援原創。

作業系統

由於目前僅試驗了windows系統下的開發,所以目前僅記錄windows系統出現的問題的解決方案,後續在其他系統中試驗完成再進行補充

原始碼

開始使用其原始碼task_master.py執行,報錯如下: 結果如下:

E:\code\demo\venv\Scripts\python.exe E:/code/demo/tasks/demo.py
Traceback (most recent call last):
  File "E:/code/demo/tasks/demo.py", line 21, in <module>
    manager.start()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\managers.py", line 543, in start
    self._process.start()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000000003EC1E0>: attribute lookup <lambda> on __main__ failed

Process finished with exit code 1

修改lambda函式

可見,錯誤發生的地方是在manager.start()這句,原因見最後,主要是說lambda 函式不能被序列化,所以要把註冊的回撥改成兩個標準函式

import random, time, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager)
: pass # 使用標準函式來代替lambda函式,pickle無法序列化lambda的問題 def get_task_queue(): global task_queue return task_queue # 使用標準函式來代替lambda函式,pickle無法序列化lambda的問題 def get_result_queue(): global task_queue return task_queue # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件: QueueManager.register('get_task_queue'
, callable=get_task_queue) QueueManager.register('get_result_queue', callable=get_result_queue) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown() print('master exit.')

再次執行:

E:\code\demo\venv\Scripts\python.exe E:/code/demo/tasks/demo.py
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main
    prepare(preparation_data)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
    run_name="__mp_main__")
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "E:\code\demo\tasks\demo.py", line 31, in <module>
    manager.start()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\managers.py", line 543, in start
    self._process.start()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 33, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

之前報錯沒有了,但是又出現了新的報錯。

解決__name__ == ‘main’ 問題

通過看提示可以看到,在windows中執行需要某個固定語法if __name__ == '__main__':,它的作用其實就是當單獨執行本檔案時才會執行下面語句(但是我本來就是執行的本檔案,但是還是需要加這個判斷,不懂為什麼,就當做固定寫法吧)。 加上這個判斷之後如下:

import random, time, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 使用標準函式來代替lambda函式,避免python2.7中,pickle無法序列化lambda的問題
def get_task_queue():
    global task_queue
    return task_queue


# 使用標準函式來代替lambda函式,避免python2.7中,pickle無法序列化lambda的問題
def get_result_queue():
    global task_queue
    return task_queue

if __name__ == "__main__":
    # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
    QueueManager.register('get_task_queue', callable=get_task_queue)
    QueueManager.register('get_result_queue', callable=get_result_queue)
    # 繫結埠5000, 設定驗證碼'abc':
    manager = QueueManager(address=('', 5000), authkey=b'abc')
    # 啟動Queue:
    manager.start()
    # 獲得通過網路訪問的Queue物件:
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 放幾個任務進去:
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)
    # 從result佇列讀取結果:
    print('Try get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Result: %s' % r)
    # 關閉:
    manager.shutdown()
    print('master exit.')

執行以後,報錯內容又換了,結果如下:

Traceback (most recent call last):
  File "E:/code/demo/tasks/demo.py", line 35, in <module>
    task = manager.get_task_queue()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\managers.py", line 701, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\managers.py", line 584, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\connection.py", line 492, in Client
    c = SocketClient(address)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\connection.py", line 619, in SocketClient
    s.connect(address)
OSError: [WinError 10049] 在其上下文中,該請求的地址無效。

指定IP

這個報錯是因為,我們沒有指定請求的IP manager = QueueManager(address=('', 5000), authkey=b'abc')

這裡光寫了埠沒寫IP啊,怎麼可能不報錯。

修改如下:

manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

繼續執行,終於沒有報錯了,哈哈哈,但是執行完master再去執行worker,發現worker報錯了,結果如下:

Traceback (most recent call last):
  File "E:/code/demo/tasks/task-worder.py", line 18, in <module>
    m.connect()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\managers.py", line 512, in connect
    conn = Client(self._address, authkey=self._authkey)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\connection.py", line 492, in Client
    c = SocketClient(address)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python37\lib\multiprocessing\connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [WinError 10061] 由於目標計算機積極拒絕,無法連線。

task放入之後,讓程式等待一會,先不要獲取result

報錯原因很明顯,連不上master伺服器,再去看master的執行結果發現,根本不跟教程一樣,把10個task放到佇列裡面之後,就停下來等著被執行,而是一下子全都被執行了,

所以需要在task放入之後,讓程式等待一會,先不要獲取result

import random, time, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 使用標準函式來代替lambda函式,避免python2.7中,pickle無法序列化lambda的問題
def get_task_queue():
    global task_queue
    return task_queue


# 使用標準函式來代替lambda函式,避免python2.7中,pickle無法序列化lambda的問題
def get_result_queue():
    global task_queue
    return task_queue

if __name__ == "__main__":
    # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
    QueueManager.register('get_task_queue', callable=get_task_queue)
    QueueManager.register('get_result_queue', callable=get_result_queue)
    # 繫結埠5000, 設定驗證碼'abc':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    # 啟動Queue:
    manager.start()
    # 獲得通過網路訪問的Queue物件:
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    # 放幾個任務進去:
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d...' % n)
        task.put(n)
    # 從result佇列讀取結果:
    time.sleep(10)
    print('Try get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Result: %s' % r)
    # 關閉:
    manager.shutdown()
    print('master exit.')

這裡我只等了10秒鐘,只要在10秒鐘之內再去執行worker就萬事大吉了,至於截圖就不用上了吧,自己去體驗成功的快感吧。