基於anaconda3的引擎供容器呼叫本地化實踐
背景: 現有基於anaconda3開發的引擎指令碼,用於動態識別處理資料;
場景: 現在這個引擎是服務化,通過網路請求來呼叫; 我們的呼叫端是一個容器化的服務,因為引擎的入參出引數據量大,所以現在的服務化方案需要修改,本地化此引擎來減少網路傳輸的時間,達到優化的目的;
方案思考:
方案一: 將引擎整合到服務映象中,這樣可以本地呼叫;
方案二: 將引擎打成單獨的映象,放到容器的宿主機上,將儲存共同掛載到相同目錄,保證容器與引擎容器共享儲存,再通過flask暴露介面來啟動這個容器;
方案結論:
-
捨棄了方案一,因為引擎太大了,單獨的映象達到快3G,若整合到服務映象中,會導致服務映象過大;且在相同容器中執行,併發效能也有一定的瓶頸(對於服務容器有資源限制);
-
在方案二中更加的靈活,可以將引擎本地化模組獨立開來;因為容器化,所以對於資料處理效率更高,併發效能也更好(可以同時執行多個容器);
實踐:
-
首先我們需要將引擎打成映象
FROM continuumio/anaconda3:latest COPY /guanlian_v4.0_for_engine_local /engine ENTRYPOINT ["python", "/engine/process_bm_local.py"]
-
首先基於anaconda3作為基礎映象;
-
然後將引擎指令碼複製到容器中;
-
使用ENTRYPOINT主要是為了傳參,因為我們需要將入參檔案的路徑傳進去,引擎才能知道位置來進行處理;而在引擎執行處理完畢後就會關閉當前容器。
通過命令打成映象
docker build -t engine:v1 .
-
-
啟動容器進行功能測試
docker run -v /home/test:/engine_data -it engine:v1 /engine_data/in.txt
檢視log,引擎執行正常,最後輸出結果:
finish in.txt in.txt_guanlian.log in.txt_out.json
驗證得知容器執行OK
-
建立一個介面,來執行docker啟動命令,我們使用flask(簡單)
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Author: yxhe # Date: 2021/6/9/0009 21:38 # ---------------------------------------------------------- import os import threading from flask import Flask, request app = Flask(__name__) @app.route('/engine_gl/execute', methods=['POST']) def translate(): folder = request.args.get("folder") file_name = request.args.get("iname") base_work_dir = request.args.get("workDir") in_path = base_work_dir + '/' + folder thread = threading.Thread(target=exe, args=(in_path, file_name, )) thread.start() return 'ok' def exe(in_path, file_name): os.system("docker run -v " + in_path + ":/engine_data -i engine:v1 /engine_data/" + file_name) os.system("touch " + in_path + "/finish") if __name__ == '__main__': app.run(host="0.0.0.0", port=8080, debug=False)
-
os.system() 方法會阻塞主執行緒,導致當前請求時間過長。最終可能請求超時;所以這裡使用執行緒非同步處理這段邏輯;
-
非同步之後,呼叫方就無法知道程式什麼時候跑完;因為需要進資料夾再拿取輸出的檔案資料;所以這邊寫入了finish檔案來標識輸出檔案完畢,可以獲取;
-
這裡遇見一個問題,指令碼執行的時候
nohup python test.py &
通過postman呼叫的時候,發現報錯如下:
the input device is not a TTY
這個錯誤是因為我們啟動docker命令的時候添加了-t配置,去掉即可
-
-
服務內部封裝本地化呼叫程式碼;
@Component @Slf4j public class AssociateEngine { @Value("${engine.associate.mount}") private String path; @Value("${engine.associate.host-mount}") private String host_mount; @Value("${engine.associate.url}") private String engine_url; @Value("${engine.associate.polling-times}") private int polling_times; @Value("${engine.associate.polling-interval}") private int polling_interval; // 30s @Value("${engine.associate.file-in-name}") private String in_name; @Value("${engine.associate.file-out-suffix}") private String out_suffix; public String execute(String json) { try { // 首先建立一個工作目錄資料夾 String folder = String.valueOf(System.currentTimeMillis()); String workDir = createWorkDir(this.splicingPath(true, path, folder)); // 將輸入引數json寫入檔案中 writeStringToFile(json, this.splicingPath(false, workDir, in_name)); log.info("本地化引擎,資料已寫入容器{}, 宿主機:{}", this.splicingPath(false, workDir, in_name), this.splicingPath(false, host_mount, folder, in_name)); // 呼叫Python指令碼中的api,觸發docker容器執行 invoke(engine_url + "?folder=%s&iname=%s&workDir=%s", folder, in_name, this.splicingPath(true, host_mount)); // 這裡是一個輪詢,來獲取執行結果,以查詢到finish檔案為結束;或者以輪詢次數超過最大限制次數為結束 String pollingResultPath = pollingResult(workDir, in_name); // 獲取到本地化引擎輸出的檔案結果目錄路徑 log.info("本地化引擎,獲取引擎執行結果{}", pollingResultPath); if (pollingResultPath == null) { log.error("引擎呼叫失敗!"); return null; } // 讀取輸出的檔案結果並返回 return readEngineResult(pollingResultPath); } catch (Exception e) { e.printStackTrace(); log.error("引擎報錯~ ,errorInfo: {}", e.getMessage(), e); } return null; } private String readEngineResult(String pollingResultPath) throws IOException { FileInputStream fin = new FileInputStream(pollingResultPath); InputStreamReader reader = new InputStreamReader(fin); BufferedReader buffReader = new BufferedReader(reader); StringBuilder stringBuilder = new StringBuilder(); String line; while((line = buffReader.readLine())!=null){ stringBuilder.append(line); } buffReader.close(); return stringBuilder.toString(); } private String pollingResult(String workDir, String inName) throws InterruptedException { int times = 1; // 輪詢 for(;;) { File file = new File(workDir); File[] fs = file.listFiles(); if (fs == null) { log.error("檔案目錄錯誤,目錄為空!"); return null; } for (File f : fs) { if (f.getName().equals("finish")) { return this.splicingPath(false, workDir, inName + out_suffix); } } log.info("本地化引擎,獲取引擎執行結果第{}次失敗。。。", times); if (times >= polling_times) { log.error("輪詢時間到,未查詢到正確結果!!"); return null; } times++; Thread.sleep(polling_interval); } } private String createWorkDir(String path) { File file = new File(path); if (!file.exists()) { boolean mkdirs = file.mkdirs(); if (mkdirs) { return path; } } return path; } private void writeStringToFile(String json, String filePath) throws IOException { OutputStream out = new FileOutputStream(filePath); out.write(json.getBytes()); out.flush(); out.close(); } private void invoke(String url, String folder, String inName, String hostMount) { HttpClientUtil.post(String.format(url, folder, inName, hostMount)); } private String splicingPath(boolean includeFinish, String... paths) { StringBuilder stringBuilder = new StringBuilder(); for (String path : paths) { stringBuilder.append(path); if (!path.endsWith(File.separator)) { stringBuilder.append(File.separator); } } String finalPath = stringBuilder.toString(); if (includeFinish) { return finalPath; } return finalPath.substring(0, finalPath.length()-1); } }
結論
至此解決引擎本地化的問題;