1. 程式人生 > 其它 >基於anaconda3的引擎供容器呼叫本地化實踐

基於anaconda3的引擎供容器呼叫本地化實踐

背景: 現有基於anaconda3開發的引擎指令碼,用於動態識別處理資料;

場景: 現在這個引擎是服務化,通過網路請求來呼叫; 我們的呼叫端是一個容器化的服務,因為引擎的入參出引數據量大,所以現在的服務化方案需要修改,本地化此引擎來減少網路傳輸的時間,達到優化的目的;

方案思考:

方案一: 將引擎整合到服務映象中,這樣可以本地呼叫;

方案二: 將引擎打成單獨的映象,放到容器的宿主機上,將儲存共同掛載到相同目錄,保證容器與引擎容器共享儲存,再通過flask暴露介面來啟動這個容器;

方案結論:
  • 捨棄了方案一,因為引擎太大了,單獨的映象達到快3G,若整合到服務映象中,會導致服務映象過大;且在相同容器中執行,併發效能也有一定的瓶頸(對於服務容器有資源限制);

  • 在方案二中更加的靈活,可以將引擎本地化模組獨立開來;因為容器化,所以對於資料處理效率更高,併發效能也更好(可以同時執行多個容器);

實踐:

  1. 首先我們需要將引擎打成映象

    FROM continuumio/anaconda3:latest
    
    COPY /guanlian_v4.0_for_engine_local /engine
    
    ENTRYPOINT ["python", "/engine/process_bm_local.py"]
    
    • 首先基於anaconda3作為基礎映象;

    • 然後將引擎指令碼複製到容器中;

    • 使用ENTRYPOINT主要是為了傳參,因為我們需要將入參檔案的路徑傳進去,引擎才能知道位置來進行處理;而在引擎執行處理完畢後就會關閉當前容器。

    通過命令打成映象

    docker build -t engine:v1 .
    
  2. 啟動容器進行功能測試

    docker run -v /home/test:/engine_data -it engine:v1 /engine_data/in.txt
    

    檢視log,引擎執行正常,最後輸出結果:

    finish  in.txt  in.txt_guanlian.log  in.txt_out.json
    

    驗證得知容器執行OK

  3. 建立一個介面,來執行docker啟動命令,我們使用flask(簡單)

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    # Author: yxhe
    # Date: 2021/6/9/0009 21:38
    # ----------------------------------------------------------
    
    import os
    import threading
    
    from flask import Flask, request
    
    app = Flask(__name__)
    
    
    @app.route('/engine_gl/execute', methods=['POST'])
    def translate():
        folder = request.args.get("folder")
        file_name = request.args.get("iname")
        base_work_dir = request.args.get("workDir")
        in_path = base_work_dir + '/' + folder
        thread = threading.Thread(target=exe, args=(in_path, file_name, ))
        thread.start()
        return 'ok'
    
    
    def exe(in_path, file_name):
        os.system("docker run -v " + in_path + ":/engine_data -i engine:v1 /engine_data/" + file_name)
        os.system("touch " + in_path + "/finish")
    
    
    if __name__ == '__main__':
        app.run(host="0.0.0.0", port=8080, debug=False)
    
    
    • os.system() 方法會阻塞主執行緒,導致當前請求時間過長。最終可能請求超時;所以這裡使用執行緒非同步處理這段邏輯;

    • 非同步之後,呼叫方就無法知道程式什麼時候跑完;因為需要進資料夾再拿取輸出的檔案資料;所以這邊寫入了finish檔案來標識輸出檔案完畢,可以獲取;

    • 這裡遇見一個問題,指令碼執行的時候

      nohup python test.py &
      

      通過postman呼叫的時候,發現報錯如下:

      the input device is not a TTY
      

      這個錯誤是因為我們啟動docker命令的時候添加了-t配置,去掉即可

  4. 服務內部封裝本地化呼叫程式碼;

    @Component
    @Slf4j
    public class AssociateEngine {
        @Value("${engine.associate.mount}")
        private String path;
        @Value("${engine.associate.host-mount}")
        private String host_mount;
        @Value("${engine.associate.url}")
        private String engine_url;
        @Value("${engine.associate.polling-times}")
        private int polling_times;
        @Value("${engine.associate.polling-interval}")
        private int polling_interval; // 30s
        @Value("${engine.associate.file-in-name}")
        private String in_name;
        @Value("${engine.associate.file-out-suffix}")
        private String out_suffix;
    
        public String execute(String json) {
            try {
                // 首先建立一個工作目錄資料夾
                String folder = String.valueOf(System.currentTimeMillis());
                String workDir = createWorkDir(this.splicingPath(true, path, folder));
                // 將輸入引數json寫入檔案中
                writeStringToFile(json, this.splicingPath(false, workDir, in_name));
                log.info("本地化引擎,資料已寫入容器{}, 宿主機:{}",  this.splicingPath(false, workDir, in_name), this.splicingPath(false, host_mount, folder, in_name));
                // 呼叫Python指令碼中的api,觸發docker容器執行
                invoke(engine_url + "?folder=%s&iname=%s&workDir=%s", folder, in_name, this.splicingPath(true, host_mount));
                // 這裡是一個輪詢,來獲取執行結果,以查詢到finish檔案為結束;或者以輪詢次數超過最大限制次數為結束
                String pollingResultPath = pollingResult(workDir, in_name);
                // 獲取到本地化引擎輸出的檔案結果目錄路徑
                log.info("本地化引擎,獲取引擎執行結果{}", pollingResultPath);
                if (pollingResultPath == null) {
                    log.error("引擎呼叫失敗!");
                    return null;
                }
                // 讀取輸出的檔案結果並返回
                return readEngineResult(pollingResultPath);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("引擎報錯~ ,errorInfo: {}", e.getMessage(), e);
            }
            return null;
        }
    
        private String readEngineResult(String pollingResultPath) throws IOException {
            FileInputStream fin = new FileInputStream(pollingResultPath);
            InputStreamReader reader = new InputStreamReader(fin);
            BufferedReader buffReader = new BufferedReader(reader);
            StringBuilder stringBuilder = new StringBuilder();
            String line;
            while((line = buffReader.readLine())!=null){
                stringBuilder.append(line);
            }
            buffReader.close();
            return stringBuilder.toString();
        }
    
        private String pollingResult(String workDir, String inName) throws InterruptedException {
            int times = 1;
            // 輪詢
            for(;;) {
                File file = new File(workDir);
                File[] fs = file.listFiles();
                if (fs == null) {
                    log.error("檔案目錄錯誤,目錄為空!");
                    return null;
                }
                for (File f : fs) {
                    if (f.getName().equals("finish")) {
                        return this.splicingPath(false, workDir, inName + out_suffix);
                    }
                }
                log.info("本地化引擎,獲取引擎執行結果第{}次失敗。。。", times);
                if (times >= polling_times) {
                    log.error("輪詢時間到,未查詢到正確結果!!");
                    return null;
                }
                times++;
                Thread.sleep(polling_interval);
            }
        }
    
        private String createWorkDir(String path) {
            File file = new File(path);
            if (!file.exists()) {
                boolean mkdirs = file.mkdirs();
                if (mkdirs) {
                    return path;
                }
            }
            return path;
        }
    
        private void writeStringToFile(String json, String filePath) throws IOException {
            OutputStream out = new FileOutputStream(filePath);
            out.write(json.getBytes());
            out.flush();
            out.close();
        }
    
        private void invoke(String url, String folder, String inName, String hostMount) {
            HttpClientUtil.post(String.format(url, folder, inName, hostMount));
        }
    
        private String splicingPath(boolean includeFinish, String... paths) {
            StringBuilder stringBuilder = new StringBuilder();
            for (String path : paths) {
                stringBuilder.append(path);
                if (!path.endsWith(File.separator)) {
                    stringBuilder.append(File.separator);
                }
            }
            String finalPath = stringBuilder.toString();
            if (includeFinish) {
                return finalPath;
            }
            return finalPath.substring(0, finalPath.length()-1);
        }
    }
    
結論

至此解決引擎本地化的問題;