1. 程式人生 > 程式設計 >python分散式計算dispy的使用詳解

python分散式計算dispy的使用詳解

dispy,是用asyncoro實現的分散式平行計算框架。

框架也是非常精簡,只有4個元件,在其原始碼資料夾下可以找到:

dispy.py (client) provides two ways of creating “clusters”: JobCluster when only one instance of dispy may run and SharedJobCluster when multiple instances may run (in separate processes). If JobCluster is used,the scheduler contained within dispy.py will distribute jobs on the server nodes; if SharedJobCluster is used,a separate scheduler (dispyscheduler) must be running.

dispynode.py executes jobs on behalf of dispy. dispynode must be running on each of the (server) nodes that form the cluster.

dispyscheduler.py is needed only when SharedJobCluster is used; this provides a scheduler that can be shared by multiple dispy users.

dispynetrelay.py is needed when nodes are located across different networks; this relays information about nodes on a network to the scheduler. If all the nodes are on same network,there is no need for dispynetrelay - the scheduler and nodes automatically discover each other.

一般情況下,使用dispy和dispynode就已經足夠解決問題了。

簡單使用:

伺服器端:

在伺服器端啟動dispy,監聽並接收所有發來的計算任務,完成計算後將結果返回給客戶端。

開啟python_home/Scripts資料夾,在安裝dispy後會有上面說到的4個dispy元件,以py檔案形式存在。當然你也可以在dispy的原始碼資料夾裡面找到對於的dispynode.py檔案,然後執行

python dispynode.py -c 2 -i 192.168.138.128 -p 51348 -s secret --clean

python dispynode.py -c 2 -i 192.168.8.143 -p 51348 -s secret --clean

這裡192.168.138.128和192.168.8.143是執行計算節點的ip(對伺服器來說相當於localhost),這裡我啟用了兩個節點,每個節點使用2個cpu資源,其中有一個節點是在虛擬機器,一個是本地機器。

-s secret是通訊密碼,客戶端和伺服器連線需要密碼,密碼隨意。

--clean表示每次啟動服務都刪除上次的啟動資訊,如果不刪除,可能會出現pid佔用的錯誤。

客戶端:

在客戶端需要注意的是,傳送到計算節點函式所引用的模組,不能在py檔案的頂層匯入,而需要在函式內匯入。

對於需要匯入自定義模組,比較麻煩一點,需要先例項化函式,才能在計算節點的函式中使用。

# 這些在頂層匯入的模組只能是這個py檔案用
import time
import socket
import numpy
import datetime

# 這個是自定義函式,要在本模組中先例項化才能在計算節點函式中呼叫使用,
# 而本模組的其他地方可以直接呼叫使用
from my_package.my_model import get_time 

# 例項化自定義的函式,注意後面是沒有括號的,否則就是直接呼叫得到返回值了
now = get_time.now

# 計算函式,dispy將這個函式和引數一併傳送到伺服器節點
# 如果函式有多個引數,需要包裝程tuple格式
def compute(args):
 n,array=args # 如果函式有多個引數,需要包裝程tuple格式
 # 看到沒,計算需要的模組是在函式內匯入的
 import time,socket
 time.sleep(3)
 host = socket.gethostname()
 # 這個py檔案中自定義函式,可以直接引用
 total= my_sum(array)
 # 這個now是在其他模組中自定義的函式,需要在頂層先例項化才能引用
 now_time=now()
 return (host,n,total,now_time)

def sum(array):
 # 自定義函式,需要的模組同樣需要在函式內匯入
 import numpy as np
 return np.sum(array)

def loadData():
 # 自定義函式,生成測試資料
 import numpy as np
 data = np.random.rand(20,20)
 data = [line for line in data]
 return data



if __name__ == '__main__':
 import dispy,random
 # 定義兩個計算節點
 nodes = ['192.168.8.143','192.168.138.128']
 # 啟動計算叢集,和伺服器通訊,通訊金鑰是'secret'
 # depends 為依賴函式
 cluster = dispy.JobCluster(compute,nodes=nodes,secret='secret',depends=[sum,now])
 jobs = []

 datas = loadData()
 for n in range(len(datas)):
  # 提交任務
  job = cluster.submit((n,datas[n]))
  job.id = n
  jobs.append(job)
 # print(datetime.datetime.now())
 # cluster.wait() # 等待所有任務完成後才接著往下執行
 # print(datetime.datetime.now())
 for job in jobs:
  host,t = job()
  print('%s executed job %s at %s with %s total=%.2f t=%s' 
    % (host,job.id,job.start_time,t))
  # other fields of 'job' that may be useful:
  # print job.stdout,job.stderr,job.exception,# job.ip_addr,job.end_time
 # 顯示叢集計算狀態
 cluster.stats()

以上這篇python分散式計算dispy的使用詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。