module05-1-基於RabbitMQ rpc實現的主機管理
阿新 • • 發佈:2017-08-25
not exit net 目錄 min .py 取值 event 機器
rabbitmq_server
需求
題目:rpc命令端
需求:
可以異步的執行多個命令
對多臺機器
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
實現需求
1. 實現全部需求
2.會緩存已建立過的連接,減少短時間內連接相同主機時再次建立連接的開銷
3.定時清理緩存的連接
目錄結構
rabbitmq_server ├ bin # 執行文件目錄 | └ rabbitmq_server.py # 執行程序接口 ├ conf # 配置文件目錄 | └ setting.py # 配置文件。目前主要保存用以連接RabbitMQ服務器的遠程用戶權限 └ core # 程序核心代碼位置 └ main.py # 主交互邏輯 rabbitmq_client ├ bin # 執行文件目錄 | └ rabbitmq_client.py # 執行程序 ├ conf # 配置文件目錄 | └ setting.py # 配置文件。目前主要保存用以連接RabbitMQ服務器的遠程用戶權限,以及緩存連接的保存時間 └ core # 程序核心代碼位置 └ main.py # 主邏輯交互程序
代碼
rabbitmq_server
1 import os,sys 2 3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,BasePath) 5 6 from core import main 7 main.main()rabbitmq_server.py
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 import os,sys,pika,subprocess,locale,threadingmain.py6 7 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 8 sys.path.insert(0,BasePath) 9 10 from conf import setting 11 12 sys_encode = locale.getdefaultlocale()[1] 13 username = setting.username 14 password = setting.password 15 16 credentials = pika.PlainCredentials(username,password)17 18 19 def cb(ch,method,properties,body): 20 command = body.decode(‘utf-8‘) 21 print(‘Received:‘,command) 22 23 res = subprocess.Popen(command,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 24 out,err = res.communicate() 25 res_con = err.decode(sys_encode) if err else out.decode(sys_encode) 26 27 ch.basic_publish( 28 exchange = ‘‘, 29 routing_key = properties.reply_to, 30 properties = pika.BasicProperties( 31 correlation_id = properties.correlation_id 32 ), 33 body = res_con 34 ) 35 print(‘send:‘,res_con) 36 37 38 39 def main(): 40 try: 41 conn = pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘, 5672, ‘/‘, credentials)) 42 except Exception as e: 43 print(e) 44 else: 45 46 try: 47 ch = conn.channel() 48 ch.queue_declare(queue=‘rpc_queue‘) 49 ch.queue_purge(queue=‘rpc_queue‘) 50 51 ch.basic_consume( 52 cb, 53 queue=‘rpc_queue‘ 54 ) 55 56 ch.start_consuming() 57 58 except KeyboardInterrupt: 59 conn.close() 60 print(‘Server closed‘) 61 except Exception as e: 62 conn.close() 63 print(‘Server down because of‘,e) 64 65 if __name__ == ‘__main__‘: 66 main()
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 username = ‘jailly‘ 6 password = ‘123456‘setting.py rabbitmq_client
1 import os,sys 2 3 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 4 sys.path.insert(0,BasePath) 5 6 from core import main 7 main.main()rabbitmq_client
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 import re 6 import time 7 import random 8 import threading 9 import sys 10 import os 11 import pika 12 13 BasePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 14 sys.path.insert(0,BasePath) 15 16 from conf import setting 17 18 username = setting.username 19 password = setting.password 20 connection_timeout = setting.connection_timeout 21 22 run_p = re.compile(‘‘‘ 23 ^\s*run\s+ # run 24 (?P<quote>\‘)?(?P<d_quote>\‘\‘)? # 單引號 或 雙引號 的前一半 25 (?P<command>.+) # command 26 (?(quote)\‘|)(?(d_quote)\‘\‘|) # 單引號 或 雙引號 的後一半 27 \s+--host\s+ 28 (?P<ips> 29 (((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s+)* # ip 30 ((([01]?\d?\d)|2[0-4]\d|25[0-5])\.){3}(([01]?\d?\d)|2[0-4]\d|25[0-5])\s* # ip 31 )$ 32 ‘‘‘,re.X) # 匹配run指令的re模式對象 33 check_p = re.compile(r‘^\s*check_task\s+(?P<task_id>\d+)\s*$‘) # 匹配check_task指令的re模式對象 34 exit_p = re.compile(r‘^\s*exit\s*$‘) # 匹配exit指令的re模式對象 35 36 task_ids = list(range(65535)) #task_id 的取值範圍 37 38 # tasks是一個保存任務的字典。key為task_id,value是該id對應的RPCClient()對象們組成的列表。創建該變量的目的是為映射 task_id 與其對應的 39 # RPCClient()對象,方便獲取該 task_id 所對應的結果(該結果即RPCClient()對象的response屬性) 40 tasks = {} 41 42 # conns是一個保存連接的字典,用以緩存已建立過的連接。key 為ip,value 是一個列表[rc,last_start_time,task_id_list],rc是該ip對應的 43 # RPCClient()對象,last_start_time是最近一次連接的開始時間,task_id_list 是使用該連接(或該RPCClient()對象)的task_id 所構成的列表。 44 # 創建該變量的目的是為了緩存連接,當在超時時間(默認5分鐘)內再次請求同一個主機時,不用再次創建連接,節省建立連接的開銷,同時也為定期清理連接保留了 45 # 連接記錄 46 conns = {} 47 48 lock = threading.RLock() 49 50 class RPCClient(object): 51 def __init__(self,host,port,vhost,credentials,conn=None): 52 53 self.host = host 54 self.conn = conn if conn else pika.BlockingConnection(pika.ConnectionParameters(host,port,vhost,credentials)) 55 self.ch = self.conn.channel() 56 self.ch.queue_declare(queue=‘rpc_queue‘) 57 58 result = self.ch.queue_declare(exclusive=True) 59 self.callback_queue = result.method.queue 60 61 self.ch.basic_consume( 62 self.cb, 63 queue = self.callback_queue 64 ) 65 66 67 def cb(self,ch,method,properties,body): 68 if self.corr_id == properties.correlation_id: 69 self.response = body 70 71 72 def call(self,command,task_id): 73 self.corr_id = str(task_id) 74 self.response = None 75 76 self.ch.basic_publish( 77 exchange = ‘‘, 78 routing_key = ‘rpc_queue‘, 79 properties = pika.BasicProperties( 80 reply_to = self.callback_queue, 81 correlation_id = self.corr_id 82 ), 83 body = command 84 ) 85 86 while self.response is None: 87 self.conn.process_data_events() 88 89 self.response = ‘\033[1;32m[--- %s ---]\033[0m\n%s\n‘% (self.host,self.response.decode(‘utf-8‘)) 90 91 92 def create_connection(conns,ips,rcs,task_id): 93 ‘‘‘ 94 創建連接,並將生成的 RPCClient()對象 加入 rcs 列表 95 :param conns: 對應 main.py 中的全局變量 conns 96 :param ips: 本次任務對應的遠程主機的 ip 列表 97 :param rcs: 對應 main.py 中的 main_interactive() 中的 rcs 變量:該task_id 對應的 RPCClient()對象們 所組成的列表 98 :param task_id: 本次任務的task_id 99 :return: 100 ‘‘‘ 101 102 credentials = pika.PlainCredentials(username,password) 103 for ip in ips: 104 if ip in conns: 105 rc = RPCClient(ip,5672,‘/‘,credentials,conn=conns[ip][0]) 106 rcs.append(rc) 107 conns[ip][1] = time.time() # 重置“最新連接的開始時間” 108 conns[ip][2].append(task_id) # 添加跟該連接相關的task_id 109 else: 110 rc = RPCClient(ip,5672,‘/‘,credentials) 111 rcs.append(rc) 112 conns[ip] = [rc.conn,time.time(),[task_id,]] 113 114 115 def get_result(tasks,task_id): 116 ‘‘‘ 117 根據task_id 獲取 RabbitMQ 服務器的返回結果 118 :param tasks: 任務列表,對應同名全局變量 119 :param task_id: 任務id 120 :return: 121 ‘‘‘ 122 123 rcs = tasks[task_id] 124 outcome = ‘‘ 125 for rc in rcs: 126 if rc.response is not None: 127 outcome += rc.response 128 else: 129 print(‘Task %s is handling\nIf the time for handling was too long,plz check the server health or your network conditions‘%task_id) 130 return False 131 else: 132 print(outcome) 133 print(‘\033[1;34mTask done,task_id %s has been cleaned up\033[0m‘%task_id ) 134 return True 135 136 137 def handle(command,ips,task_id,conns): 138 ‘‘‘ 139 建立連接,並處理指令。相當於整合了creat_connection() 和 get_result() 140 :param command: 待處理的指令 141 :param ips: 待連接的主機ip列表 142 :param task_id: 任務id 143 :param conns: 保存連接的列表,對應同名全局變量 144 :return: 145 ‘‘‘ 146 147 # 第一步:建立連接 148 rcs = [] # 與本次 task_id 對應的 RPCClient() 對象們組成的列表 149 try: 150 create_connection(conns, ips, rcs, task_id) 151 except Exception as e: 152 print(‘\033[1;31mCan not connect with specified host,plz check server health or make sure your network patency\033[m‘) 153 else: 154 tasks[task_id] = rcs 155 156 # 第二步:處理指令 157 for rc in rcs: 158 tc = threading.Thread(target=rc.call, args=(command, task_id)) 159 tc.setDaemon(True) 160 tc.start() 161 162 163 def main_interactive(): 164 ‘‘‘ 165 主交互邏輯 166 :return: 167 ‘‘‘ 168 169 while 1: 170 cmd = input(‘>> ‘).strip() 171 172 m = run_p.search(cmd) 173 if m: 174 command = m.group(‘command‘) 175 ips = m.group(‘ips‘).split() 176 177 task_id = random.choice(task_ids) 178 task_ids.remove(task_id) 179 print(‘task_id:‘, task_id) 180 181 t = threading.Thread(target=handle,args=(command,ips,task_id,conns)) 182 t.setDaemon(True) 183 t.start() 184 185 else: 186 m = check_p.search(cmd) 187 if m: 188 task_id = m.group(‘task_id‘) 189 if task_id.isdigit(): 190 task_id = int(task_id) 191 if task_id in tasks: 192 if get_result(tasks,task_id): 193 # 取得結果後,釋放task_id,刪除對應的RPCClient() 對象 194 task_ids.append(task_id) 195 del tasks[task_id] 196 else: 197 print(‘‘‘\033[1;31mTask_id not found.It may because: 198 1. task_id does not exist 199 2. connection with specified host has not been created.For this,please try again later 200 3. connection with specified host has been cleaned up because of timeout\033[0m‘‘‘) 201 else: 202 print(‘\033[1;31mTask_id must be integer\033[0m‘) 203 204 else: 205 m = exit_p.search(cmd) 206 if m: 207 for rc in tasks.values(): 208 rc.conn.close() 209 exit() 210 else: 211 if cmd: 212 print(‘\033[1;31mCommand \‘%s\‘ not found\033[0m‘%cmd.split()[0]) 213 else: 214 print(‘Command can not be None‘) 215 216 217 def clean_conns(): 218 ‘‘‘ 219 超時連接清理。 220 每5分鐘(默認值,可在setting中設置)檢查一次,某連接最近一次建立/聲明距現在5分鐘以上,則從conns列表中刪除,同時刪除其對應的task_id和RPCClient()對象 221 註意 : 如果有正在執行的任務,會因為清除對應的RPCClient()對象,而無法取得結果!!! 222 :return: 223 ‘‘‘ 224 225 while 1: 226 time.sleep(connection_timeout) 227 lock.acquire() 228 for ip in conns.copy(): # 不能在本字典叠代時刪除字典元素,可以通過其軟拷貝間接刪除之 229 if time.time() - conns[ip][1] > connection_timeout: # 判斷是否超時 230 for id in conns[ip][2]: 231 232 # 清除與該連接相關的所有任務記錄 233 if id in tasks: 234 del tasks[id] 235 236 del conns[ip] # 清除連接記錄 237 lock.release() 238 239 240 def main(): 241 t = threading.Thread(target=clean_conns) 242 t.setDaemon(True) 243 t.start() 244 245 main_interactive() 246 247 248 if __name__ == ‘__main__‘: 249 main()main.py
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 # Author:Jailly 4 5 username = ‘jailly‘ 6 password = ‘123456‘ 7 connection_timeout = 300setting.py
module05-1-基於RabbitMQ rpc實現的主機管理