1. 程式人生 > >Luat重構版原始碼讀書筆記之排程實現

Luat重構版原始碼讀書筆記之排程實現

前言

重構版是什麼?這兒引用稀飯放姜大神給重構版寫的readme

OpenLuat 基於合宙(AirM2M) 原有Luat平臺重構的一個分支,主要是重構了平臺程式碼,增加了基於協程的多執行緒任務支援,使使用者可以快速進行產品的開發,而不用考慮訊息的繁瑣回撥。避免複雜的回撥導致類似“goto”那種混淆的邏輯,同時保留了原有的訊息回撥機制。

當應用場景需要訊息回撥的時候,依舊可以使用訊息的釋出和訂閱執行模式進行程式設計。

OpenLuat – Task 程式設計

  • OpenLuat 支援多工程式設計,利用底層訊息機制和Lua原生協程完美融合實現了多執行緒支援和多工程式設計,並且保留了訊息機制特有的高實時性和低功耗特性。
  • OpenLuat 提供了基於執行緒阻塞的函式– wait(ms) ,用來幫助使用者解決任務需要延時等待的情況,不同於底層rtos.sleep,呼叫wait(ms)的任務主動釋放資源並掛起,直到延時值滿足被主排程器恢復執行。
  • OpenLuat 提供了訊息機制的條件等待超時處理函式– result, data = waitUntil(id, ms),用來幫助使用者解決一些需要等待條件滿足立刻恢復任務的情況,並提供了超時呼叫回撥函式的處理方式。返回值用作語句執行結束後做進一步處理用,以滿足不同的場景需求。

預備知識

如果對Lua的coroutine 有一定了解,可以跳過這部分。

Lua 支援 coroutine ,這個東西也被稱為協同式多執行緒 (collaborative multithreading)。 Lua 為每個 coroutine 提供一個獨立的執行線路。舉個通俗易懂的例子:去飯店吃飯,假設飯店只有一個廚師,這時候來了三個客人,分別點了一號菜,二號菜,三號菜。如果按照一二三這樣的順序做菜的話,效率很低。現在引入一種新模式,每個菜花2分鐘時間去做。這樣的順序就變為了花兩分鐘做第一道菜,兩分鐘到了,做第二道菜,二分鐘到了,然後第三道菜。這樣的好處是每個客人的菜都有一段時間正在製作過程中,不會出現其他菜必須等到一道菜結束後才可以去做。客人就是上帝,二號客人比較餓,所以可以要求廚師花5分鐘製作二號菜。這樣的好處之一是可以對每道菜靈活分配時間。不太恰當的比喻,廚師就是CPU,客人就是任務。

先看一個簡單的程式:

co = coroutine.create(                                      --1
    function(i)
        print(coroutine.status(co))
        print(i);
    end
)

print(coroutine.status(co))                                 --2
coroutine.resume(co, 1)                                     --3
print(coroutine
.status(co)) --4 --輸出結果 --suspended --running --1 --dead
  • 建立一個 coroutine 需要呼叫一次coroutine.create。它只接收單個引數,這個引數是 coroutine 的主函式。 create 函式僅僅建立一個新的 coroutine 然後返回它的控制器(一個型別為 thread 的物件);它並不會啟動 coroutine 的執行。
  • 輸出當前執行緒狀態,為suspend(掛起,並未執行)
  • 喚醒執行緒,傳入引數,此時執行執行緒,執行緒狀態為running,輸出1
  • 執行緒結束,正常退出,coroutine.resume(co, 1)返回true。輸出執行緒狀態,為dead。注意:dead之後不能再resume(死了的人怎麼能喚醒呢?/滑稽)

這兒提到了三種狀態,畫了一個圖來描述它們之間的關係

1

方法 釋義
coroutine.create() 建立coroutine,返回thread, 引數是一個函式建之後執行緒屬於掛起狀態,並沒有執行!
coroutine.resume() 執行執行緒,和create配合使用,此時執行緒為running狀態。
coroutine.yield() 掛起coroutine,將coroutine設定為掛起狀態。下次執行resume,程式將回到掛起的位置繼續執行而不是從頭再執行。掛起成功返回true
coroutine.status() 檢視coroutine的狀態注:coroutine的狀態有三種:dead,suspend,running。
coroutine.running() 返回正在跑的coroutine,一個coroutine就是一個執行緒,當使用running的時候,就是返回一個corouting的執行緒號

coroutine 可以通過兩種方式來終止執行:一種是正常退出,指它的主函式返回(最後一條指令被執行後,無論有沒有顯式的返回指令); 另一種是非正常退出,它發生在未保護的錯誤發生的時候。第一種情況中, coroutine.resume返回 true,接下來會跟著 coroutine 主函式的一系列返回值。第二種發生錯誤的情況下, coroutine.resume返回 false ,緊接著是一條錯誤資訊。

接下來我們分析一個更詳細的例項(引用於Lua手冊):

function foo (a)                                        --1
    print("foo 函式輸出", a)
    return coroutine.yield(2 * a) -- 返回  2*a 的值
end

co = coroutine.create(function (a , b)                  --2
    print("第一次協同程式執行輸出", a, b) -- co-body 1 10
    local r = foo(a + 1)

    print("第二次協同程式執行輸出", r)
    local r, s = coroutine.yield(a + b, a - b)  -- a,b的值為第一次呼叫協同程式時傳入

    print("第三次協同程式執行輸出", r, s)
    return b, "結束協同程式"                   -- b的值為第二次呼叫協同程式時傳入
end)

print("main", coroutine.resume(co, 1, 10)) -- true, 4       --3
print("--分割線----")
print("main", coroutine.resume(co, "r")) -- true 11 -9      --4
print("---分割線---")
print("main", coroutine.resume(co, "x", "y")) -- true 10 end    --5
print("---分割線---")
print("main", coroutine.resume(co, "x", "y")) -- cannot resume dead coroutine   --5
print("---分割線---")

--輸出結果
--[[
第一次協同程式執行輸出 1   10
foo 函式輸出    2
main    true    4
--分割線----
第二次協同程式執行輸出 r
main    true    11  -9
---分割線---
第三次協同程式執行輸出 x   y
main    true    10  結束協同程式
---分割線---
main    false   cannot resume dead coroutine
---分割線---

]]

顯然,這個例子比上面例子複雜許多,不過只要仔細分析,理解起來也不會困難

  • 呼叫resume喚醒執行緒,並傳參1,10。輸出“第一次協同程式執行輸出 1 10”。接下來執行foo函式,輸出“foo 函式輸出 2”。在foo函式中遇到了yeild,掛起執行緒,此時程式停留在這兒,下次喚醒執行緒時從該處繼續執行。返回yeild的引數。輸出“main true 4”。
    • 第二次呼叫resume喚醒執行緒,傳入引數“r”,注意:此時傳入的引數“r”,賦值給coroutine.yield,所以相當於local r = “r”,輸出“第二次協同程式執行輸出r”。再次遇到yeild,掛起執行緒,此時程式停留在這兒,下次喚醒執行緒時從該處繼續執行。返回yeild的引數。輸出“main true 11 -9”。
      • 第三次呼叫resume喚醒執行緒,傳入引數“x”,“y”,賦值給coroutine.yield,相當於local r,s = “r”,”s”,輸出“第三次協同程式執行輸出xy”。到這兒整個執行緒就結束了,輸出“main true 10 結束協同程式”
  • 第四次呼叫resume喚醒執行緒,此時執行緒已經為dead了,無法喚醒。

resume和yield的配合強大之處在於,resume處於主程中,它將外部狀態(資料)傳入到協同程式內部;而yield則將內部的狀態(資料)返回到主程中。

再舉個小例子說明resume和yield關係

co = coroutine.create (function (a,b)
  local a,b = coroutine.yield(a+b)
  print("co", a,b)
end)
print(coroutine.resume(co,4,5))
coroutine.resume(co, 7, 8)
--輸出
--[[
true    9
co  7   8
]]
  • 呼叫resume喚醒執行緒,並且傳入4,5。遇到yeild,掛起程式,返回a+b。所以輸出“true 9”。
  • 第二次呼叫resume喚醒執行緒,並且傳入7,8。此時回到上次掛起的位置,並將賦值給a,b。相當於local a,b = 7,8

重構版

為了更好的理解重構版,花了大量時間講解Lua的協同式多執行緒 ,接下來進入正題

先寫一個測試程式

module(..., package.seeall)


sys.taskInit(function()
    cnt = 0
    while true do
        cnt = cnt + 1
        print("task_A_cnt: ", cnt)
        sys.wait(1000)
    end
end)

sys.taskInit(function()
    cnt = 0
    while true do
        cnt = cnt + 1
        print("task_B_cnt: ", cnt)
        sys.wait(2000)
    end
end)

輸出結果,只摘抄了一小部分

task_B_cnt:     132
task_A_cnt:     133
task_A_cnt:     134
task_B_cnt:     135
task_A_cnt:     136
task_A_cnt:     137
task_B_cnt:     138
task_A_cnt:     139
task_A_cnt:     140
task_B_cnt:     141
task_A_cnt:     142

該測試程式總共建立了2個任務,第一個任務每次加1,掛起1000ms,第二個任務每次加1,掛起2000ms,所以最後的輸出為:輸出兩次task_A_cnt, 輸出一次task_B_cnt。如果在微控制器上習慣寫UCOS或者FreeRTOS的開發者看到這樣的結構肯定不會陌生。

首先呼叫sys.taskInit建立任務,任務體的格式為

sys.taskInit(function()
    xxxx
    while true do
        xxxxx
        sys.wait(100)
    end
end)

還有一種為

local function xxxx(...) 
    xxxx
end
sys.taskInit(xxxx,...)

和UCOS,FreeRTOS的任務體大致相同,一個while死迴圈,然後通過延時切換任務。

接下來分析一下sys.taskInit和sys.wait兩個重要的函式

先看sys.taskInit的原始碼

function taskInit(fun, ...)
    local co = coroutine.create(fun)
    coroutine.resume(co, unpack(arg))
    return co
end

sys.taskInit實際是封裝了coroutine.createcoroutine.resume。建立一個任務執行緒,並執行該執行緒,返回執行緒號。

再看sys.wait

function wait(ms)
    -- 引數檢測,引數不能為負值
    assert(ms > 0, "The wait time cannot be negative!")
    -- 選一個未使用的定時器ID給該任務執行緒
    if taskTimerId >= TASK_TIMER_ID_MAX then taskTimerId = 0 end
    taskTimerId = taskTimerId + 1
    local timerid = taskTimerId
    taskTimerPool[coroutine.running()] = timerid
    timerPool[timerid] = coroutine.running()
    -- 呼叫core的rtos定時器
    if 1 ~= rtos.timer_start(timerid, ms) then log.debug("rtos.timer_start error") return end
    -- 掛起呼叫的任務執行緒
    local message, data = coroutine.yield()
    if message ~= nil then
        rtos.timer_stop(timerid)
        taskTimerPool[coroutine.running()] = nil
        timerPool[timerid] = nil
        return message, data
    end
end

如何將定時器和任務組織起來的呢?其中最重要的就是taskTimerPool,timerPool這兩個表。在此之前我們得每個執行緒的執行緒號都是唯一不變的。

程式流程:

  • 檢測定時時間是否正確
  • 判斷定時器是否用完,如果沒有,則分配一個未使用的定時器ID給該任務執行緒
  • 定時器ID加1
  • 以執行緒號為下標儲存定時器ID號到taskTimerPool表中
  • 以定時器ID號為下標儲存執行緒號ID到timerPool表中
  • 開啟定時器

這樣描述比較抽象,舉個例子會更好理解一點

sys.taskInit(function()
    cnt = 0
    while true do
        print("task: ", 1)
        sys.wait(100)
    end
end)

以這個簡單的例子來解釋

sys.taskInit建立並執行該執行緒,進入sys.wait函式,taskTimerId的初始值為0,所以+1,taskTimerId=1,coroutine.running()會返回正在執行的任務的執行緒號,也就是當前任務的執行緒號,比如該例中為0x8218dbc0。注意:執行緒號是唯一不會改變的。所以taskTimerPool[0x8218dbc0] = 1,timerPool[1] = 0x8218dbc0。這樣就將定時器ID和執行緒號聯絡起來了。然後開啟定時器,掛起該任務,執行下一任務。

問題來了,定時器達到定時時間的時候怎麼處理呢?

看下面的程式碼

function run()
    while true do
        -- 分發內部訊息
        dispatch()
        -- 阻塞讀取外部訊息
        local msg, param = rtos.receive(rtos.INF_TIMEOUT)
        -- 判斷是否為定時器訊息,並且訊息是否註冊
        if msg == rtos.MSG_TIMER and timerPool[param] then
            if param < TASK_TIMER_ID_MAX then
                local taskId = timerPool[param]
                timerPool[param] = nil
                if taskTimerPool[taskId] == param then
                    taskTimerPool[task  Id] = nil
                    coroutine.resume(taskId)
                end
            else
                local cb = timerPool[param]
                timerPool[param] = nil
                if para[param] ~= nil then
                    cb(unpack(para[param]))
                else
                    cb()
                end
            end
        --其他訊息(音訊訊息、充電管理訊息、按鍵訊息等)
        elseif type(msg) == "number" then
            handlers[msg](param)
        else
            handlers[msg.id](msg)
        end
    end
end

讀取外部訊息,當定時器達到定時時間後,會發生一個訊息。下面摘抄至wiki(有問題有多去wiki查詢)

2

所以,msg為rtos.MSG_TIMER,param為定時器ID號。

  • 判斷是否為任務開啟的定時器,若是,判斷定時器ID是否超過最大值
  • 根據timerPool獲取執行緒號並清除
  • 如果能在taskTimerPool中找到定時器ID和任務號對應,則喚醒該執行緒

這樣,我們就能實現任務與任務之間的排程了。

**