Python RPC 遠端呼叫指令碼之 RPyC 實踐
最近有個監控需求,需要遠端執行叢集每個節點上的指令碼,並獲取指令碼執行結果,為了安全起見不需要賬號密碼登陸節點主機,要求只需要呼叫遠端指令碼模組的方法就能實現。
總結下python進行遠端呼叫指令碼方法:
- 登陸主機執行指令碼,python模組支援如 pssh、pexpect、paramiko、ansible
- 以遠端方法呼叫(不需要登陸主機),python模組 rpyc,支援分散式
- socket 方式,稍顯複雜,需要熟悉網路協議,起點比較高
rpyc支援遠端呼叫、分散式計算,以較少程式碼量實現複雜socket程式設計,本文主要介紹 rpyc 並用它來實現一個 demo。
以程式碼方式介紹:
需求:分別執行叢集每個節點上 server 端的指令碼,並返回執行結果給 client 端
Monitor_RPC_Client.py #!/usr/bin/env python # coding=utf-8 # 測試utf-8編碼 # python exec_cmd.py "ls -lrt /opt/data1/logs/nginx/pc/track/`date +'%Y%m%d'`|awk '{s+=$5}END{print s}'" # python exec_cmd.py "wc -l /opt/data1/logs/nginx/pc/track/`date +'%Y%m%d'`/*|awk '{s+=$1}END{print s}'" import sys reload(sys) sys.setdefaultencoding('utf-8') import rpyc from pyUtil import * from multiprocessing.dummy import Pool as ThreadPool hostDict = { '192.168.1.216': 12345, '192.168.1.217': 12345, '192.168.1.218': 12345 } localResultDict = {} def rpc_client(host_port_cmd): host = host_port_cmd[0] port = host_port_cmd[1] cmd = host_port_cmd[2] c = rpyc.connect(host, port) result = c.root.exposed_execCmd(cmd) localResultDict[host] = result c.close() def exec_cmd(cmd_str): host_port_list = [] for (host, port) in hostDict.items(): host_port_list.append((host, port, cmd_str)) pool = ThreadPool(len(hostDict)) results = pool.map(rpc_client, host_port_list) pool.close() pool.join() for ip, result in sorted(localResultDict.iteritems(), key=lambda d: int(d[0].replace(".", ""))): print ip + ":t" + result if __name__ == "__main__": if len(sys.argv) == 2 and sys.argv[1] != "-h": print "======================" print " Your command is:t" + sys.argv[1] print "======================" cmd_str = sys.argv[1] else: print """ 該指令碼可以在叢集中批量執行任意命令並返回結果,但需注意以下幾點: 1、命令請先單機測試通過,然後提交給指令碼批量執行; 2、不要執行 rm 等危險 || 極其耗時 || 影響機器效能的命令; 3、命令請用雙引號引起來,另外命令中有 $ 符號需要轉義成 $ 否則會被 Shell 當做變數解析掉,具體請參見下面的例子。 Usage && for example: python exec_cmd.py "ls -lrt /opt/data1/logs/nginx/pc/track/{}|awk '{{s+=$5}}END{{print s}}'" python exec_cmd.py "wc -l /opt/data1/logs/nginx/pc/track/{}/*|awk '{{s+=$1}}END{{print s}}'" """.format(yesterday, yesterday) sys.exit(1) exec_cmd(cmd_str) Monitor_RPC_Server.py #!/usr/bin/env python # coding=utf-8 # 測試utf-8編碼 # cd /opt/script/rpcMonitorFlume # pkill -f flumeFileMonitor_RPC_Server.py # nohup python -u flumeFileMonitor_RPC_Server.py >> logs/flumeFileMonitor_RPC_Server.log 2>&1 & import sys reload(sys) sys.setdefaultencoding('utf-8') import os, commands, glob, re import datetime from rpyc import Service from rpyc.utils.server import ThreadedServer from pyUtil import getNowTime, get_ip_address class remote_call_func(Service): def on_connect(self): print "[{0}]t--------------<<< on_connect".format(getNowTime()) def on_disconnect(self): print "[{0}]t-------------->>> on_disconnect".format(getNowTime()) def exposed_execCmd(self, cmd): exitCode, execResult = commands.getstatusoutput(cmd) nowTime = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S") print "[{0}] → {1} → {2}".format(nowTime, cmd, execResult) return execResult rpycServer = ThreadedServer(remote_call_func, hostname=get_ip_address('eth0'), port=11111, auto_register=False) rpycServer.start()
官方文件中類似例子很多,就不詳細介紹了,需注意3點:
- server端定義方法需要被client呼叫,必須定義以exposed 開頭的方法,不然會報錯AttributeError: ‘remote_call_script’ object has no attribute ‘exposed_iamshell’
- server端預設不設認證機制,如果需要認證有推薦兩種方法: ThreadedServer的authenticator引數與SSL模組
- pip install rpyc ,如果 import rpyc 報錯則 yum install openssl-devel,然後重新編譯、安裝 python
當然還需要考慮很多異常處理,如超時、驗證失敗等。
Refer:
[1] python遠端呼叫指令碼(一)
http://rpyc.readthedocs.org/en/latest/tutorial.html
[2] python學習——python中執行shell命令
http://zhou123.blog.51cto.com/4355617/1312791
[3] celery實現任務統一收集、分發執行
http://blog.csdn.net/vintage_1/article/details/47664187
[4] Timeout function if it takes too long to finish [duplicate]
http://stackoverflow.com/questions/2281850/timeout-function-if-it-takes-too-long-to-finish
[5] 原始碼之Queue
http://www.cnblogs.com/liqxd/p/5104051.html
[6] python多執行緒程式設計(9) Queue模組
http://beginman.cn/python/2015/12/01/python-threading-queue/
[7] Python 並行任務技巧
http://my.oschina.net/leejun2005/blog/194270?fromerr=mNcoWQlp
[8] 利用 Python yield 建立協程將非同步程式設計同步化
http://my.oschina.net/leejun2005/blog/501448?fromerr=ynpLsTXB
[9] Python 多執行緒教程:併發與並行
http://my.oschina.net/leejun2005/blog/398826
[10] 理解 Python 中的多執行緒
http://my.oschina.net/leejun2005/blog/179265
[11] paramiko小記