1. 程式人生 > >HTML5 Web Worker 多執行緒與執行緒池

HTML5 Web Worker 多執行緒與執行緒池

筆者最近對專案進行優化,順帶就改了些東西,先把請求方式優化了,使用到了web worker。筆者發現目前還沒有太多深入對web worker的使用的文章,除了涉及到一些WebGL的文章,所以總結了這個文章,給大家參考參考。一下內容以預設你對web worker已經有了初步瞭解,不會講解基礎知識。

一、為什麼要開子執行緒

筆者這個專案是一個儲存系統的中後臺管理GUI,一些資料需要通過CronJob定時地去獲取,並且業務對資料的即時性要求高,大量的和持久的HTTP請求是不可避免的,並且該專案部署了HTTP/2,頻寬和併發數可以極大。而且不需要相容IE系列,哈哈哈,針對這些點於是決定優(瞎)化(弄)。筆者一開始想到的就是使用HTML5的新特性web worker,然後將HTTP的請求工作從主執行緒放到子執行緒裡面去做,工作完成後,返回子執行緒資料即可。這樣可以降低主執行緒中的負荷,使主執行緒可以坐享其成。一旦子執行緒中發起的請求成功或錯誤後,子執行緒返回給主執行緒請求的response物件或者直接返回請求得到的資料或錯誤資訊。最終的方案裡,選擇的是直接返回請求得到的資料,而不是response物件,這個在後面會詳細說明為什麼這樣做。子執行緒對於處於複雜運算,特別是搭配wasm,對於處理WebGL幀等有極大的效能優勢。以往的純JS視訊解碼,筆者只看到過能夠解碼MPEG1(大概240P畫面)的canvas庫,因為要達到60幀的畫面流暢度,就必須保證1幀的計算時間要小於16ms,如果要解碼1080P的畫面甚至4K,JS可能跑不過來了,而且長時間的計算會嚴重阻塞主執行緒,影響頁面效能,如果能開啟子執行緒把計算任務交給子執行緒做,並通過wasm加快計算速度,這將在前端領域創造極大的可能性。

二、為什麼要設計執行緒池

如果只開一個執行緒,工作都在這一個子執行緒裡做,不能保證它不阻塞。如果無止盡的開啟而不進行控制,可能導致執行管理平臺應用時,瀏覽器的記憶體消耗極高:一個web worker子執行緒的開銷大概在5MB左右。

無論這5MB記憶體是否已被這個子執行緒完全使用,還是說僅僅是給這個子執行緒預規劃的記憶體空間,但這個空間確實是被佔用了。並且頻繁地建立和終止執行緒,對效能的消耗也是極大的。所以我們需要通過執行緒池來根據瀏覽器所在計算機的硬體資源對子執行緒的工作進行規劃和排程,以及對殭屍執行緒的清理、新執行緒的開闢等等。根據測試,在頁面關閉以後,主執行緒結束,子執行緒的記憶體佔用會被一併釋放,這點不需要做額外的處理。

三、設計執行緒池

對於執行緒池,我們需要實現的功能有如下這些,程式碼中的英文註釋為筆者專案上的需求,因為之前有外國同事在一起開發專案,為了他們閱讀程式碼方便,所以統一使用英文註釋,可以直接忽略,筆者會對重要地方直接給出說明。

1. 初始化執行緒

通過 Navagitor 物件的 HardWareConcurrecy 屬性可以獲取瀏覽器所屬計算機的CPU核心數量,如果CPU有超執行緒技術,這個值就是實際核心數量的兩倍。當然這個屬性存在相容性問題,如果取不到,則預設為4個。我們預設有多少個CPU執行緒數就開多少個子執行緒。執行緒池最大執行緒數量就這麼確定了,簡單而粗暴:


class FetchThreadPool {
    constructor (option = {}){
        const {
            inspectIntervalTime = 10 * 1000,
            maximumWorkTime = 30 * 1000
        } = option;
        this.maximumThreadsNumber = window.navigator.hardwareConcurrency || 4;
        this.threads = [];
        this.inspectIntervalTime = inspectIntervalTime;
        this.maximumWorkTime = maximumWorkTime;
        this.init();
    }<br>   ......<br>}

獲取到最大執行緒數量後,我們就可以根據這個數量來初始化所有的子執行緒了,並給它們額外加上一個我們需要的屬性:


  init (){
        for (let i = 0; i &lt; this.maximumThreadsNumber; i ++){
            this.createThread(i);
        }
        setInterval(() =&gt; this.inspectThreads(), this.inspectIntervalTime);
    }

    createThread (i){
        // Initialize a webWorker and get its reference.
        const thread = work(require.resolve('./fetch.worker.js'));
        // Bind message event.
        thread.addEventListener('message', event =&gt; {
            this.messageHandler(event, thread);
        });
        // Stick the id tag into thread.
        thread['id'] = i;
        // To flag the thread working status, busy or idle.
        thread['busy'] = false;
        // Record all fetch tasks of this thread, currently it is aimed to record reqPromise.
        thread['taskMap'] = {};
        // The id tag mentioned above is the same with the index of this thread in threads array.
        this.threads[i] = thread;
    }

其中:

id為數字型別,表示這個執行緒的唯一標識,

busy為布林型別,表示這個執行緒當前是否處於工作繁忙狀態,

taskMap為物件型別,存有這個執行緒當前的所有工作任務的key/value對,key為任務的ID taskId,value為這個任務的promise的resolve和reject回撥物件。

由上圖還可以看出,在初始化每個子執行緒時我們還給這個子執行緒在主執行緒裡綁定了接收它訊息的事件回撥。在這個回撥裡面我們可以針對子執行緒返回的訊息,在主執行緒裡做對應的處理:


  messageHandler (event, thread){
        let {channel, threadCode, threadData, threadMsg} = event.data;
        // Thread message ok.
        if (threadCode === 0){
            switch (channel){
                case 'fetch':
                    let {taskId, code, data, msg} = threadData;
                    let reqPromise = thread.taskMap[taskId];
                    if (reqPromise){
                        // Handle the upper fetch promise call;
                        if (code === 0){
                            reqPromise.resolve(data);
                        } else {
                            reqPromise.reject({code, msg});
                        }
                        // Remove this fetch task from taskMap of this thread.
                        thread.taskMap[taskId] = null;
                    }
                    // Set the thread status to idle.
                    thread.busy = false;
                    this.redirectRouter();
                    break;

                case 'inspection':
                    // console.info(`Inspection info from thread, details: ${JSON.stringify(threadData)}`);
                    // Give some tips about abnormal worker thread.
                    let {isWorking, workTimeElapse} = threadData;
                    if (isWorking &amp;&amp; (workTimeElapse &gt; this.maximumWorkTime)){
                        console.warn(`Fetch worker thread ID: ${thread.id} is hanging up, details: ${JSON.stringify(threadData)}, it will be terminated.`);
                        fetchThreadPool.terminateZombieThread(thread);
                    }
                    break;

                default:
                    break;
            }
        } else {
            // Thread message come with error.
            if (threadData){
                let {taskId} = threadData;
                // Set the thread status to idle.
                thread.busy = false;
                let reqPromise = thread.taskMap[taskId];
                if (reqPromise){
                    reqPromise.reject({code: threadCode, msg: threadMsg});
                }
            }
        }
    }

這裡處理的邏輯其實挺簡單的:

1). 首先規定了子執行緒和主執行緒之間通訊的資料格式:


{
     threadCode: 0,
     threadData: {taskId, data, code, msg}, 
     threadMsg:  'xxxxx',
     channel: 'fetch',
}

其中:

threadCode: 表示這個訊息是否正確,也就是子執行緒在post這次message的時候,是否是因為報錯而發過來,因為我們在子執行緒中會有這個設計機制,用來區分任務完成後的正常的訊息和執行過程中因報錯而傳送的訊息。如果為正常訊息,我們約定為0,錯誤訊息為1,暫定只有1。

threadData: 表示訊息真正的資料載體物件,如果threadCode為1,只返回taskId,以幫助主執行緒銷燬找到呼叫上層promise的reject回撥函式。Fecth取到的資料放在data內部。

threadMsg: 表示訊息錯誤的報錯資訊。非必須的。

channel: 表示資料頻道,因為我們可能通過子執行緒做其他工作,在我們這個設計裡至少有2個工作,一個是發起fetch請求,另外一個是響應主執行緒的檢查(inspection)請求。所以需要一個額外的頻道欄位來確認不同工作。

這個資料格式在第4步的子執行緒的設計中,也會有對應的體現。

2). 如果是子執行緒回覆的檢查訊息,那麼根據子執行緒返回的狀態決定這個子執行緒是否已經掛起了,如果是就把它當做一個殭屍執行緒殺掉。並重新建立一個子執行緒,替換它原來的位置。

3). 在任務結束後,這個子執行緒的busy被設定成了false,表示它重新處於閒置狀態。

4). 在給子執行緒派發任務的時候,我們post了taskId,在子執行緒的回覆資訊中,我們可以拿到這個taskId,並通過它找到對應的promise的resolve或者reject回撥函式,就可以響應上層業務中Fetch呼叫,返回從服務端獲取的資料了。

2、執行主執行緒中Fetch呼叫的工作

首先,我們在主執行緒中封裝了統一呼叫Fetch的收口,頁面所有請求均走這個唯一入口,對外暴露Get和Post方法,裡面的業務有關的部分程式碼可以忽略:


const initRequest = (url, options) =&gt; {
    if (checkRequestUnInterception(url)){
        return new Promise(async (resolve, reject) =&gt; {
            options.credentials = 'same-origin';
            options.withCredentials = true;
            options.headers = {'Content-Type': 'application/json; charset=utf-8'};
            fetchThreadPool.dispatchThread({url, options}, {resolve, reject});
        });
    }
};

const initSearchUrl = (url, param) =&gt; (param ? url + '?' + stringify(param) : url);

export const fetchGet = (url, param) =&gt; (initRequest(initSearchUrl(url, param), {method: 'GET'}));

export const fetchPost = (url, param) =&gt; (initRequest(url, {method: 'POST', body: JSON.stringify(param)}));

線上程池中,我們實現了對應的方法來執行Fetch請求:


    dispatchThread ({url, options}, reqPromise){
        // Firstly get the idle thread in pools.
        let thread = this.threads.filter(thread =&gt; !thread.busy)[0];
        // If there is no idle thread, get a thread by random.
        if (!thread){
            thread = this.threads[randomNumberBoth(0, this.threads.length - 1)];
        }
        // Stick the reqPromise into taskMap of thread.
        let taskId = Date.now();
        thread.taskMap[taskId] = reqPromise;
        // Dispatch fetch work to thread.
        thread.postMessage({
            channel: 'fetch',
            data: {url, options, taskId}
        });
        thread.busy = true;
    }

這裡排程的邏輯是:

1). 首先遍歷當前所有的子執行緒,過濾出閒置中的子執行緒,取第一個來下發任務。

2). 如果沒有閒置的子執行緒,那麼在當前子執行緒中隨機找一個,來下發任務。這也是為什麼每個子執行緒不直接使用task屬性,而給它一個taskMap,就是因為一個子執行緒可能同時擁有兩個任務。

3、定時輪訓檢查執行緒與終結殭屍執行緒


   inspectThreads (){
        if (this.threads.length &gt; 0){
            this.threads.forEach(thread =&gt; {
                // console.info(`Inspection thread ${thread.id} starts.`);
                thread.postMessage({
                    channel: 'inspection',
                    data: {id: thread.id}
                });
            });
        }
    }

    terminateZombieThread (thread){
        let id = thread.id;
        this.threads.splice(id, 1, null);
        thread.terminate();
        thread = null;
        this.createThread(id);
    }

從第1步的程式碼中我們可以得知初始化定時檢查 inspectThreads 是在整個執行緒池init的時候執行的。對於檢查殭屍執行緒和執行 terminateZombieThread 也是在第1步中的處理子執行緒資訊的回撥函式中進行的。

4. 子執行緒的設計

子執行緒的設計,相對於執行緒池來說就比較簡單了:


export default self =&gt; {
    let isWorking = false;
    let startWorkingTime = 0;
    let tasks = [];
    self.addEventListener('message', async event =&gt; {
        const {channel, data} = event.data;
        switch (channel){
            case 'fetch':
                isWorking = true;
                startWorkingTime = Date.now();
                let {url, options, taskId} = data;
                tasks.push({url, options, taskId});
                try {
                    // Consider to web worker thread post data to main thread uses data cloning
                    // not change the reference. So, here we don't post the response object directly,
                    // because it is un-cloneable. If we persist to post id, we should use Transferable
                    // Objects, such as ArrayBuffer, ImageBitMap, etc. And this way is just like to
                    // change the reference(the control power) of the object in memory.
                    let response = await fetch(self.origin + url, options);
                    if (response.ok){
                        let {code, data, msg} = await response.json();
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code, data, msg},
                        });
                    } else {
                        const {status, statusText} = response;
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code: status, msg: statusText || `http error, code: ${status}`},
                        });
                        console.info(`%c HTTP error, code: ${status}`, 'color: #CC0033');
                    }
                } catch (e){
                   self.postMessage({
                       threadCode: 1,
                       threadData: {taskId},
                       threadMsg: `Fetch Web Worker Error: ${e}`
                   });
                }
                isWorking = false;
                startWorkingTime = 0;
                tasks = tasks.filter(task =&gt; task.taskId !== taskId);
                break;

            case 'inspection':
                // console.info(`Receive inspection thread ${data.id}.`);
                self.postMessage({
                    threadCode: 0,
                    channel: 'inspection',
                    threadData: {
                        isWorking,
                        startWorkingTime,
                        workTimeElapse: isWorking ? (Date.now() - startWorkingTime) : 0,
                        tasks
                    },
                });
                break;

            default:
                self.postMessage({
                    threadCode: 1,
                    threadMsg: `Fetch Web Worker Error: unknown message channel: ${channel}}.`
                });
                break;
        }
    });
};

首先,我們在正常的Fecth成功後的資料通訊中,post的是對response處理以後的結構化資料,而不是直接post這個response物件,這個在第一章節中有提到,這裡詳細說一下:

Fetch請求的response物件並非單純的Object物件。在子執行緒和主執行緒之間使用postMessage等方法進行資料傳遞,資料傳遞的方式是克隆一個新的物件來傳遞,而非直接傳遞引用,但response物件作為一個非普通的特殊物件是不可以被克隆的......。要傳遞response物件只有就需要用到HTML5裡的一些新特性比如  Transferable object 的 ArrayBuffer  、 ImageBitmap  等等,通過它們可以直接傳遞物件的引用,這樣做的話就不需要克隆物件了,進而避免因對response物件進行克隆而報錯,以及克隆含有大量資料的物件帶來的高額開銷。但對於這個 Transferable Object 的使用,還需筆者有時間再研究,這裡先選擇傳遞一個普通的Object物件來現實基本的功能。

對於子執行緒中每次給主執行緒post的message,也是嚴格按照第1步中說明的那樣定義的。

還有一點需要說明:筆者的專案都是基於webpack的模組化開發,要直接使用一個web worker的js檔案,筆者選了"webworkify-webpack"這個庫來處理模組化的,這個庫還執行在子執行緒中隨意import其他模組,使用及其方便:


import work from 'webworkify-webpack';

所以,在第1步中才出現了這樣的建立子執行緒的方式: const thread = work(require.resolve('./fetch.worker.js')); 

該庫把web worker的js檔案通過  createObjectURL 方法把js檔案內容轉成了二進位制格式,這裡請求的是一個二進位制資料的連結(引用),將會到記憶體中去找到這個資料,所以這裡並不是一個js檔案的連結:

如果你的專案形態和筆者不同,大可不必如此,按照常規的web worker教程中的指導方式走就行。

筆者這個專案在主執行緒和子執行緒之間只傳遞了很少量的資料,速度非常快,一旦你的專案需要去傳遞大量資料,比如說一個異常複雜的大物件,如果直接傳遞結構化物件,速度會很慢,可以先字串化了以後再發送,避免了在post的過程中時間消耗過大。

筆者捕捉到的一個postMessage的消耗,如果資料量小的話,還算正常:

5. 通過子執行緒發起請求


    getClusterPhysicalNodeList (){
        requestHandler(async () =&gt; {
            let data = await fetchGet('/api/getnodelist');
            !!data &amp;&amp; store.dispatch(dashboardAction.setClusterPhysicalNodeList(data));
        });
    }

資料回來了:

 

從截圖中可以看出,和直接在主執行緒中發起的Fetch請求不同的是,在子執行緒中發起的請求,在Name列裡會增加一個齒輪在開頭以區分。

需要注意的一點是:如果子執行緒被終結,無法檢視返回資訊等,因為這些資料的佔用記憶體已經隨子執行緒的終結而被回收了。

我們在子執行緒中寫一個明顯的錯誤,也會回撥reject,並在控制檯報錯:

從開發者工具裡可以檢測到這8個子執行緒:

大概的設計就是如此,還需要在業務中進行測試和增強。

四、Web Worker的相容性

從caniuse給出的資料來看,相容性異常的好,甚至連IE系列都在好幾年前就已經支援:

但是...,這個相容性只能說明能否使用Web Woker,這裡的相容並不能表明能在其中做其他操作。比如標準規定,可以在子執行緒做做計算、發起XHR請求等,但不能操作DOM物件。筆者在專案中使用的Fetch,而非Ajax,然後Fecth在IE系列(包括Edge)瀏覽器中並不支援,會直接報錯。在近新版本的Chrome、FireFox、Opera中均無任何問題。後來作者換成了Axios這種基於原生的XHR封裝的庫,在IE系列中還是要報錯。後來又換成了純原生的XmlHttpRequest,依舊報錯。這就和標準有出入了......。同學們可以試試,不知到筆者的方法是否百分百正確。但欣慰的是前幾天的新聞說微軟未來在Edge瀏覽器開發中將使用Chromium核心。

至於Web Woker衍生出來的其他新特性,比如 Shared Web Woker等,或者在子執行緒中再開子執行緒,這些特性的使用在各個瀏覽器中並不統一,有些支援,有些不支援,或者部分支援,所以對於這些特性暫時就不要去考慮它們了。

五、展望

在前端開發這塊(沒用Web前端了,是筆者認為現在的前端開發已經不僅限於Web平臺了,也不僅限於前端了),迄今為止活躍度是非常之高了。新技術、新標準、新協議、新框(輪)架(子)的出現是非常快速的。技術跌該更新頻率極高,比如這個Web Worker,四年前就定稿了,筆者現在針對它寫部落格......。一個新技術的出現可能不能造成什麼影響,但是多種新技術的出現和搭配使用將帶來翻天覆地的變化。前端的發展越來越多地融入了曾經只在Client、Native端出現的技術。特別是近年來的WebGL、wasm等新技術的推出,都是具有意義的。

原文地址:https://www.cnblogs.com/rock-roll/p/10176738.html