1. 程式人生 > 其它 >記一次python 協程給合多執行緒死鎖問題

記一次python 協程給合多執行緒死鎖問題

前言

死鎖和具體的開發語言無關,工業界使用的主流開發語言者都提供並行/併發,執行緒/程序,及各種鎖的元語

多執行緒導致的死鎖在現在的程式碼開發中已經很少見了,現在日常談到的死鎖主要是sql這類db的事務導致的sql死鎖

因為大部分開發工作已經很少直接和鎖打交道,都是各種封裝好的元件,如java的juc等。

以java為例,早年jdk1.6,jdk1.7的是時候,還有大量直接和鎖打交道的場景,但現在基本都是使用juc實現的工具類

另外一些無鎖架構,例如 lbev/lebuv/akka/回撥/async/await/協程,本身設計上就回避了可能出現死鎖的場景

這類機制個人都搞過

無鎖的 lua,nodejs,golang,akka,及python裡的async/await,gevent機制

有鎖的 c#,java,scala,juc


問題

程式在某次變更後出現假死,執行一段時間後,不再有日誌輸出,消費和生產停滯

其實一開始並沒有想到死鎖,也是對自己的程式碼比較自信,程式碼裡用到了資源池,但讀寫分治,沒想到會有死鎖的部分,最後確實打臉了

因為有大量的io 操作,最初以為是哪個io環節出問題了,且沒有釋放連線

在所有連線處加了超時控制,依然出現假死

因為方法是高度抽象的,該程式的執行元件與其他元件,區別只是某個類的實現不同

別的程式元件完全正常,就這個元件假死,判斷是執行部分異常


簡述下該專案的併發模型

專案為機器學習/深度學習的演算法應用,由於要載入演算法模型做計算(預測/分類),而演算法模型這一片基本是python的天下

因此專案演算法應用部分以python語言開發,同時在演算法應用外的io部分,python語言本身,也不構成效能瓶頸,因此全部使用python,而未使用多語言

傳統的服務主要分為cpu密集,或io密集,web類的讀寫各種db/sql/nosql/mq業務系統是io密集型,佔業力開發總量的90%以上

演算法相關的則為cpu密集,部分mem密集,由於深度學習的工業界應用,深度學習重度的依賴顯示卡資源

因此除cpu密集(cpu通用計算)外,又出現大量gpu密集的場景

機器學習階段,cpu演算法應用的模型普遍較小,記憶體也較為廉價,因此mem並不是個值得注意的點,gpu演算法則因為模型佔用大量的視訊記憶體空間,視訊記憶體昂貴,gpu mem密集也成為必須注意的點


專案功能簡單,但要同時考慮cpu/gpu/mem和io

演算法應用部分既有cpu/gpu/mem/視訊記憶體密集

大量小檔案的讀寫又是io密集

最終專案的選型和優化方案是

計算部分 多執行緒 cpu/gpu密集,平行計算

io 部分協程 async/await,減少執行緒排程,同時提高並行度

mem/視訊記憶體部分(載入模型) 資源池,假設載入一個模型2G,有限視訊記憶體,只能載入4個模型,模型複用,提供4個併發度。

排查最終定位到執行部分

class BoundedThreadPoolExecutor(ThreadPoolExecutor):
    def __init__(self, max_workers, max_waiting_tasks, *args, **kwargs):
        super().__init__(max_workers=max_workers, *args, **kwargs)
        self.model_queue = Queue(maxsize=max_workers)
        for i in range(0, max_workers):
            self.model_queue.put(TextSystem(ocr_config.DEFAULT_TS_ARGS))
            
class DocStageOcrHandle(DocStageHandle):
    def __init__(self, job_name="test_job_name", config_name="test_config.ini", media_type="media_type", use_gpu=False):
        DocStageHandle.__init__(self, job_name, "alg", config_name, media_type)
        ocr_config.DEFAULT_TS_ARGS.use_gpu = use_gpu
        self.exector = BoundedThreadPoolExecutor(4, 8)    
    
    async def _ocr_by_thread_submit(self, media_type: str, rsq_data: bytes, file_name: str, url_info: dict):
        ocr_list_set = []
        # 拿出一個模型
        text_sys = self.exector.model_queue.get(block=True)
        f = self.exector.submit(ocrImageToSetByBuffer, rsq_data, text_sys)
        return f, text_sys

    async def _ocr_by_thread_load_result(self, media_type: None, f: None, text_sys: None):
        result = None
        try:
            result = f.result()
        except Exception as e:
            raise e
        finally:
            # 再把模型放回
            self.exector.model_queue.put(text_sys)
        ocr_list_set = []
        return [result]
      
    async def _ocr_media_without_lock_async_by_pool(self, media_type: str, rsq_data: bytes, url_info: dict):
        try:
                f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
                ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)          
        except RuntimeError as exception:
                pass
        return ocr_list_set      

發生死鎖的位置

                f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
                ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)          

協程和執行緒並用,出現死鎖

多執行緒死鎖,理解簡單

協程和執行緒死鎖,不好描述,但換個角度,協程實際也是線上程中執行的,本質上依然是執行緒和執行緒間的死鎖

該執行部分的死鎖部分是self.exector.model_queue.get(block=True) ,model_queue的行為類似阻塞佇列

text_sys = self.exector.model_queue.get(block=True)self.exector.model_queue.put(text_sys)

多執行緒分別執行這兩項,並不構成問題

假設兩個執行緒1.執行緒1在讀,執行緒2在寫

即使讀寫不平衡,但因為佇列流動,讀寫會互相觸發,不構成死鎖

協程的話,則有影響

執行緒1,內有多個協程,協程執行到await讓出時間片,執行,再根據排程回當前任務

執行緒2,內有多個協程

協程兩個步驟,分別執行讀和寫,會出現

執行緒1停在子協程的讀上

執行緒2也停在子協程的讀上

兩個執行緒實際,都在讀,都在等待,雖然兩個執行緒內都有一個協程的執行單元在佇列中準備寫,但是因為排程不到,無法寫入


解決方法也很簡單

協程的讀寫,不要由await分割開來,做為一個整體,這樣執行緒的讀寫不對協程暴露,協程不會因為讀/寫的排程而阻塞

現方法

    async def _ocr_media_without_lock_async_by_pool(self, media_type: str, rsq_data: bytes, url_info: dict):
        try:
        #資源池-阻塞讀
                f, text_sys = await self._ocr_by_thread_submit(media_type, rsq_data, "", url_info)
        #資源池-阻塞寫
                ocr_list_set = await self._ocr_by_thread_load_result(media_type, f, text_sys)          
        except RuntimeError as exception:
                pass
        return ocr_list_set    

即改為

         
class DocStageOcrHandle(DocStageHandle):
    def __init__(self, job_name="test_job_name", config_name="test_config.ini", media_type="media_type", use_gpu=False):
        DocStageHandle.__init__(self, job_name, "alg", config_name, media_type)
        ocr_config.DEFAULT_TS_ARGS.use_gpu = use_gpu
        self.exector = BoundedThreadPoolExecutor(4, 8)    
    
    async def _ocr_by_thread(self, media_type: str, rsq_data: bytes, file_name: str, url_info: dict):
        ocr_list_set = []
        # 拿出一個模型
        text_sys = self.exector.model_queue.get(block=True)
        f = self.exector.submit(ocrImageToSetByBuffer, rsq_data, text_sys)
        result = None
        try:
            result = f.result()
        except Exception as e:
            raise e
        finally:
            # 再把模型放回
            self.exector.model_queue.put(text_sys)
        ocr_list_set = []
        return [result]      
      
    async def _ocr_media_without_lock_async_by_pool(self, media_type: str, rsq_data: bytes, url_info: dict):
        try:
                #在一個方法內同時,阻塞讀/阻塞寫,不把阻塞,暴露給協程
                ocr_list_set = await self._ocr_by_thread(media_type, rsq_data, "", url_info)
        except RuntimeError as exception:
                pass
        return ocr_list_set

如此,問題解決

協程/執行緒共用的場景比較少,因此出現這種問題的場景比較少,也比較典型

async/await 分隔了多執行緒對資源的讀寫,async/await排程機制,能讀的阻塞在讀上,能寫的阻塞在讀上,導致死鎖

解決方式就是避免 async/await 分隔多執行緒對資源的讀寫,使多執行緒對資源,在同一個方法內