Python分散式任務處理
Python分散式程序
面對多工需求的時候,thread和process都能實現相應的功能。但更推薦使用process,因為process更穩定。並且process可以在多臺機器上實現分散式的應用,而多執行緒thread只能在一臺機器上使用多個CPU。
那在Python中我們該如何使用分散式程序完成我們的需求?
Python提供了multiprocessing模組。該模組不僅提供實現多程序process,其中子模組manager還支援將多程序分佈到多臺機器上。
一臺機器充當任務的排程者(master程序),將任務分發到不同的程序中,通過網路通訊將任務分發到不同的機器上。這裡我們不需要知道manager模組如何將任務進行分發,只需要知道他的用法。
現在我們需要實現一個“不斷輸入數字,計算得出輸入數字的平方”。
如果我們不使用分散式程序,只使用單機多程序。該如何完成?
單機多程序實現
1.初始化一個佇列
2.產生數字的程序(master),並將產生的數字put到全域性的佇列中
3.進行平方根計算的程序(worker),將計算完成的資料輸出
分散式多程序實現
當我們使用分散式多程序的時候,一個佇列就不能滿足我們的需求,需要兩個佇列:masterQueue、workerQueue。我們來看一下使用分散式多程序如何完成上述需求。
1.建立master(主機)任務註冊、分發程序
首先我們需要建立一個DistributedMasterProcess.py
'分散式程序,使用multiprocessing.manager模組進行多程序佇列的管理'
__author__ = 'click'
__date__ = '2018/7/25 下午1:55'
import time, random, queue
# 1.匯入BaseManager模組(管理Queue,註冊、獲取。連線master)
from multiprocessing.managers import BaseManager
# 2.建立生產佇列master
masterQueue = queue.Queue()
# 建立消費佇列,worker
workerQueue = queue.Queue()
# 建立manager管理queue(這一步也需要有的)
class QueueManager(BaseManager):
pass
# 3.使用baseManager將兩個佇列註冊到網路上
QueueManager.register('get_master_queue', callable=lambda: masterQueue)
QueueManager.register('get_worker_queue', callable=lambda: workerQueue)
# 4.繫結網路埠5000,設定驗證碼'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動queue
manager.start()
# 5.獲取到註冊到網路上的生產、消費佇列
master = manager.get_master_queue()
worker = manager.get_worker_queue()
# 往生產佇列中新增任務
for i in range(10):
n = random.randint(0, 10)
print('往master佇列中新增任務 %s' % n)
master.put(n)
# 準備從消費佇列中取出
print('從消費佇列獲取內容')
for i in range(10):
r = worker.get(timeout=10)
print('消費佇列worker%s' % r)
# 關閉manger
manager.shutdown()
建立一個master機器上的程式碼如上,按照上述步驟,逐一講解。
1.匯入manager模組
1.匯入BaseManager模組(管理Queue,註冊、獲取。連線master)
from multiprocessing.managers import BaseManager
manager模組已經封裝了相關底層的網路的操作,使用分散式時,匯入我們需要重寫的BaseManager類
2.建立生產佇列master、工作佇列worker。
# 2.建立生產佇列master
masterQueue = queue.Queue()
# 建立消費佇列,worker
workerQueue = queue.Queue()
建立相關佇列,這裡大家思考一個問題:如果有不同的任務,我們該如何處理?
3.將相關的佇列註冊到網路。
QueueManager.register('get_master_queue', callable=lambda: masterQueue)
QueueManager.register('get_worker_queue', callable=lambda: workerQueue)
我們只需要執行相關的註冊程式碼,即可在網路中找到我們註冊的佇列。
get_master_queue
在不同的程序中獲取master佇列的介面。
get_worker_queue
在不同的程序中獲取worker佇列的介面。
callable=lambda: workerQueue
初始化的佇列與相應的介面繫結
注:不管是master、worker佇列都只會是masetr的機器上進行初始化,其他的機器(程序)只使用,不負責初始化。
4.繫結網路埠5000,設定驗證碼’abc’
繫結網路埠5000,設定驗證碼'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager繫結對外暴露的埠。
autherkey
worker與worker之間連線的祕鑰
5.獲取到註冊到網路上的生產、消費佇列
master = manager.get_master_queue()
worker = manager.get_worker_queue()
使用(3)完成註冊的介面獲取到我們需要使用的相關佇列。之後我們就可以進行任務的新增與獲取。
此時只是完成了masetr(主機)的程式碼編寫。單單是執行master程式碼是不能完成任務的。我們還需要worker(分散式機器)相關的任務處理程式碼。
2.建立worker(分散式機器)任務處理程序
現在我們需要建立任務處理的DistributedWorkerProcess.py(worker)程序。
主要用來註冊相關佇列、連線master(主機)、獲取相關佇列、處理相關佇列資料。
'生產程序'
__author__ = 'click'
__date__ = '2018/7/25 下午3:13'
import random, time, queue, sys
from multiprocessing.managers import BaseManager
# 建立BaseManager
class QueueManger(BaseManager):
pass
# 1.向網路中註冊生產,消費佇列
QueueManger.register('get_master_queue')
QueueManger.register('get_worker_queue')
# 2.連線到伺服器,也就是執行master_queue的機器
server_addr = '127.0.0.1'
print('連線到服務端 %s' % server_addr)
# 初始化manager
manager = QueueManger(address=(server_addr, 5000), authkey=b'abc')
# 連線到伺服器
manager.connect()
# 3.獲取到master佇列
master = manager.get_master_queue()
# 獲取到消費worker佇列
worker = manager.get_worker_queue()
# 4.從master中獲取任務,並放到worker佇列中
for i in range(10):
try:
n = master.get(timeout=1)
print('worker程序獲取到master佇列中的元素%s' % n)
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
worker.put(r)
except queue.Empty:
print('佇列為空')
# 工作程序執行完畢
print('worker執行完成')
主要的流程在master中已經經過詳細的介紹,程式碼中也做了詳細的註釋,相信大家能夠很輕鬆的理解。這裡不做過多的說明。
編寫完了master(主機)、worker(分散式機器)程式碼,我們就可以直接執行我們第一個分散式程序了。
這裡我們要先啟動master(主機)隨後啟動worker(分散式機器)。
結合一下現實情況,人家都還沒準備好,你就要開工了幹了,出了事你是要負責的。
最後執行的結果:
master(主機)
往master佇列中新增任務 0
往master佇列中新增任務 2
往master佇列中新增任務 10
往master佇列中新增任務 6
往master佇列中新增任務 7
往master佇列中新增任務 4
往master佇列中新增任務 6
往master佇列中新增任務 3
往master佇列中新增任務 2
往master佇列中新增任務 4
從消費佇列獲取內容
消費佇列worker0 * 0 = 0
消費佇列worker2 * 2 = 4
消費佇列worker10 * 10 = 100
消費佇列worker6 * 6 = 36
消費佇列worker7 * 7 = 49
消費佇列worker4 * 4 = 16
消費佇列worker6 * 6 = 36
消費佇列worker3 * 3 = 9
消費佇列worker2 * 2 = 4
消費佇列worker4 * 4 = 16
worker(分散式程序)
連線到服務端 127.0.0.1
worker程序獲取到master佇列中的元素0
worker程序獲取到master佇列中的元素2
worker程序獲取到master佇列中的元素10
worker程序獲取到master佇列中的元素6
worker程序獲取到master佇列中的元素7
worker程序獲取到master佇列中的元素4
worker程序獲取到master佇列中的元素6
worker程序獲取到master佇列中的元素3
worker程序獲取到master佇列中的元素2
worker程序獲取到master佇列中的元素4
worker執行完成
通過上述的程式碼,我們不難發現Python提供的分散式多程序介面非常的方便。並且幫我們省去了繁瑣、晦澀的網路部分,掌握起來很簡單。當你有大量的任務的時候使用分散式多程序代替多執行緒,誰讓它還有很多優點呢!