Python分散式程序報錯:pickle模組不能序列化lambda函式
阿新 • • 發佈:2019-02-06
今天在學習到廖老師Python教程的分散式程序時,遇到了一個錯誤:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模組不能序列化lambda函式)
程式碼如下:
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import random, queue from multiprocessing.managers import BaseManager # 傳送任務的佇列 task_queue = queue.Queue() # 接收結果的佇列 result_queue = queue.Queue() # 從BaseManager繼承的QueueManager class QueueManager(BaseManager): pass # 吧兩個Queue都註冊到網路上,callable引數關聯了Queue物件 QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 繫結埠5000,設定驗證碼abc manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動queue manager.start() # 獲得通過網路訪問的Queue物件 task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務 for i in range(10): n = random.randint(0, 1000) print('新增任務 %d' % n) task.put(n) # 從result佇列讀取結果 print('嘗試獲取結果') for i in range(10): r = result.get(timeout=10) print('結果是:%s' % r) # 關閉 manager.shutdown() print('master exit')
在教程中我記得有關pickle的事兒(有印象看來思想還在線上,哈哈),翻了一下,看到:
原來是系統問題造成的,那麼,如何解決呢?在教程中我也看到,遇到這樣的情況,需要我們自己定義函式,實現序列化。
所以對程式碼稍加修改,定義兩個函式return_task_queue和return_result_queue實現序列化:
#!/usr/bin/env python # _*_ coding:utf-8 _*_ import random, queue from multiprocessing.managers import BaseManager # 傳送任務的佇列 task_queue = queue.Queue() # 接收結果的佇列 result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue # 從BaseManager繼承的QueueManager class QueueManager(BaseManager): pass if __name__ == '__main__': # 吧兩個Queue都註冊到網路上,callable引數關聯了Queue物件 QueueManager.register('get_task_queue', callable=return_task_queue) QueueManager.register('get_result_queue', callable=return_result_queue) # 繫結埠5000,設定驗證碼abc manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') # 啟動queue manager.start() # 獲得通過網路訪問的Queue物件 task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務 for i in range(10): n = random.randint(0, 1000) print('新增任務 %d' % n) task.put(n) # 從result佇列讀取結果 print('嘗試獲取結果') for i in range(10): r = result.get(timeout=10) print('結果是:%s' % r) # 關閉 manager.shutdown() print('master exit')
執行結果: