1. 程式人生 > >Python分散式任務處理

Python分散式任務處理

Python分散式程序

面對多工需求的時候,thread和process都能實現相應的功能。但更推薦使用process,因為process更穩定。並且process可以在多臺機器上實現分散式的應用,而多執行緒thread只能在一臺機器上使用多個CPU。

那在Python中我們該如何使用分散式程序完成我們的需求?

Python提供了multiprocessing模組。該模組不僅提供實現多程序process,其中子模組manager還支援將多程序分佈到多臺機器上

一臺機器充當任務的排程者(master程序),將任務分發到不同的程序中,通過網路通訊將任務分發到不同的機器上。這裡我們不需要知道manager模組如何將任務進行分發,只需要知道他的用法。

現在我們需要實現一個“不斷輸入數字,計算得出輸入數字的平方”。

如果我們不使用分散式程序,只使用單機多程序。該如何完成?

單機多程序實現

1.初始化一個佇列
2.產生數字的程序(master),並將產生的數字put到全域性的佇列中
3.進行平方根計算的程序(worker),將計算完成的資料輸出

單機多程序實現

分散式多程序實現

當我們使用分散式多程序的時候,一個佇列就不能滿足我們的需求,需要兩個佇列:masterQueueworkerQueue。我們來看一下使用分散式多程序如何完成上述需求。

1.建立master(主機)任務註冊、分發程序

首先我們需要建立一個DistributedMasterProcess.py

(master),它負責相關佇列的初始化。


'分散式程序,使用multiprocessing.manager模組進行多程序佇列的管理'
__author__ = 'click'
__date__ = '2018/7/25 下午1:55'

import time, random, queue
# 1.匯入BaseManager模組(管理Queue,註冊、獲取。連線master)
from multiprocessing.managers import BaseManager

# 2.建立生產佇列master
masterQueue = queue.Queue()
# 建立消費佇列,worker
workerQueue = queue.Queue() # 建立manager管理queue(這一步也需要有的) class QueueManager(BaseManager): pass # 3.使用baseManager將兩個佇列註冊到網路上 QueueManager.register('get_master_queue', callable=lambda: masterQueue) QueueManager.register('get_worker_queue', callable=lambda: workerQueue) # 4.繫結網路埠5000,設定驗證碼'abc' manager = QueueManager(address=('', 5000), authkey=b'abc') # 啟動queue manager.start() # 5.獲取到註冊到網路上的生產、消費佇列 master = manager.get_master_queue() worker = manager.get_worker_queue() # 往生產佇列中新增任務 for i in range(10): n = random.randint(0, 10) print('往master佇列中新增任務 %s' % n) master.put(n) # 準備從消費佇列中取出 print('從消費佇列獲取內容') for i in range(10): r = worker.get(timeout=10) print('消費佇列worker%s' % r) # 關閉manger manager.shutdown()

建立一個master機器上的程式碼如上,按照上述步驟,逐一講解。

1.匯入manager模組

1.匯入BaseManager模組(管理Queue,註冊、獲取。連線master)
from multiprocessing.managers import BaseManager

manager模組已經封裝了相關底層的網路的操作,使用分散式時,匯入我們需要重寫的BaseManager類

2.建立生產佇列master、工作佇列worker

# 2.建立生產佇列master
masterQueue = queue.Queue()
# 建立消費佇列,worker
workerQueue = queue.Queue()

建立相關佇列,這裡大家思考一個問題:如果有不同的任務,我們該如何處理?

3.將相關的佇列註冊到網路。

QueueManager.register('get_master_queue', callable=lambda: masterQueue)
QueueManager.register('get_worker_queue', callable=lambda: workerQueue)

我們只需要執行相關的註冊程式碼,即可在網路中找到我們註冊的佇列。

get_master_queue

在不同的程序中獲取master佇列的介面。

get_worker_queue

在不同的程序中獲取worker佇列的介面。

callable=lambda: workerQueue

初始化的佇列與相應的介面繫結

不管是master、worker佇列都只會是masetr的機器上進行初始化,其他的機器(程序)只使用,不負責初始化。

4.繫結網路埠5000,設定驗證碼’abc’

繫結網路埠5000,設定驗證碼'abc'
manager = QueueManager(address=('', 5000), authkey=b'abc')

manager繫結對外暴露的埠。

autherkey

worker與worker之間連線的祕鑰

5.獲取到註冊到網路上的生產、消費佇列

master = manager.get_master_queue()
worker = manager.get_worker_queue()

使用(3)完成註冊的介面獲取到我們需要使用的相關佇列。之後我們就可以進行任務的新增與獲取。

此時只是完成了masetr(主機)的程式碼編寫。單單是執行master程式碼是不能完成任務的。我們還需要worker(分散式機器)相關的任務處理程式碼。

2.建立worker(分散式機器)任務處理程序

現在我們需要建立任務處理的DistributedWorkerProcess.py(worker)程序。

主要用來註冊相關佇列、連線master(主機)、獲取相關佇列、處理相關佇列資料。


'生產程序'
__author__ = 'click'
__date__ = '2018/7/25 下午3:13'

import random, time, queue, sys
from multiprocessing.managers import BaseManager


# 建立BaseManager

class QueueManger(BaseManager):
    pass


# 1.向網路中註冊生產,消費佇列
QueueManger.register('get_master_queue')
QueueManger.register('get_worker_queue')

# 2.連線到伺服器,也就是執行master_queue的機器

server_addr = '127.0.0.1'
print('連線到服務端 %s' % server_addr)
# 初始化manager
manager = QueueManger(address=(server_addr, 5000), authkey=b'abc')

# 連線到伺服器
manager.connect()
# 3.獲取到master佇列
master = manager.get_master_queue()
# 獲取到消費worker佇列
worker = manager.get_worker_queue()

# 4.從master中獲取任務,並放到worker佇列中

for i in range(10):
    try:
        n = master.get(timeout=1)
        print('worker程序獲取到master佇列中的元素%s' % n)
        r = '%d * %d = %d' % (n, n, n * n)
        time.sleep(1)
        worker.put(r)
    except queue.Empty:
        print('佇列為空')
# 工作程序執行完畢
print('worker執行完成')

主要的流程在master中已經經過詳細的介紹,程式碼中也做了詳細的註釋,相信大家能夠很輕鬆的理解。這裡不做過多的說明。

編寫完了master(主機)、worker(分散式機器)程式碼,我們就可以直接執行我們第一個分散式程序了。

這裡我們要先啟動master(主機)隨後啟動worker(分散式機器)。
結合一下現實情況,人家都還沒準備好,你就要開工了幹了,出了事你是要負責的。

最後執行的結果:

master(主機)

往master佇列中新增任務 0
往master佇列中新增任務 2
往master佇列中新增任務 10
往master佇列中新增任務 6
往master佇列中新增任務 7
往master佇列中新增任務 4
往master佇列中新增任務 6
往master佇列中新增任務 3
往master佇列中新增任務 2
往master佇列中新增任務 4
從消費佇列獲取內容
消費佇列worker0 * 0 = 0
消費佇列worker2 * 2 = 4
消費佇列worker10 * 10 = 100
消費佇列worker6 * 6 = 36
消費佇列worker7 * 7 = 49
消費佇列worker4 * 4 = 16
消費佇列worker6 * 6 = 36
消費佇列worker3 * 3 = 9
消費佇列worker2 * 2 = 4
消費佇列worker4 * 4 = 16

worker(分散式程序)

連線到服務端 127.0.0.1
worker程序獲取到master佇列中的元素0
worker程序獲取到master佇列中的元素2
worker程序獲取到master佇列中的元素10
worker程序獲取到master佇列中的元素6
worker程序獲取到master佇列中的元素7
worker程序獲取到master佇列中的元素4
worker程序獲取到master佇列中的元素6
worker程序獲取到master佇列中的元素3
worker程序獲取到master佇列中的元素2
worker程序獲取到master佇列中的元素4
worker執行完成

通過上述的程式碼,我們不難發現Python提供的分散式多程序介面非常的方便。並且幫我們省去了繁瑣、晦澀的網路部分,掌握起來很簡單。當你有大量的任務的時候使用分散式多程序代替多執行緒,誰讓它還有很多優點呢!

DistributedMasterProcess.py

DistributedWorkerProcess.py