1. 程式人生 > >Lua基礎 coroutine —— Lua的多執行緒程式設計

Lua基礎 coroutine —— Lua的多執行緒程式設計

Lua的coroutinethread 的概念比較相似,但是也不完全相同。一個multi-thread的程式,可以同時有多個thread 在執行,但是一個multi-coroutines的程式,同一時間只能有一個coroutine 在執行,而且當前正在執行的coroutine 只有在被顯式地要求掛起時,才會掛起。Lua的coroutine 是一個強大的概念,儘管它的幾個主要應用都比較複雜。

1. Coroutine 基礎

Lua將coroutine相關的所有函式封裝在表coroutine 中。create 函式,建立一個coroutine ,以該coroutine 將要執行的函式作為引數,返回型別為thread 。


coroutine 有4個不同的狀態:suspended, running, dead, normal。當新create 一個coroutine的時候,它的狀態為suspended ,意味著在create 完成後,該coroutine 並沒有立即執行。我們可以用函式status 來檢視該coroutine 的狀態:


函式coroutine.resume (恢復)執行該coroutine,將其狀態從suspended變為running:


在該示例中,該coroutine執行,簡單地輸出一個“hi”就結束了,該coroutine變為dead狀態:


到目前為止,coroutine看起來好像也就這麼回事,類似函式呼叫,但是更復雜的函式呼叫。但是,coroutine的真正強大之處在於它的yield

函式,它可以將正在執行的coroutine 掛起,並可以在適當的時候再重新被喚醒,然後繼續執行。下面,我們先看一個簡單的示例:


我們一步一步來講,該coroutine每列印一行,都會被掛起,看起來是不是在執行yield 函式的時候被掛起了呢?當我們用resume 喚醒該coroutine時,該coroutine繼續執行,打印出下一行。直到最後沒有東西打印出來的時候,該coroutine退出迴圈,變為dead狀態(注意最後那裡的狀態變化)。如果對一個dead狀態的coroutine進行resume 操作,那麼resume會返回false+err_msg,如上面最後兩行所示。

注意,resume 是執行在protected mode下。當coroutine內部發生錯誤時,Lua會將錯誤資訊返回給resume

呼叫。

當一個coroutine A在resume另一個coroutine B時,A的狀態沒有變為suspended,我們不能去resume它;但是它也不是running狀態,因為當前正在running的是B。這時A的狀態其實就是normal 狀態了。

Lua的一個很有用的功能,resume-yield對,可以用來交換資料。下面是4個小示例:

1)main函式中沒有yield,呼叫resume時,多餘的引數,都被傳遞給main函式作為引數,下面的示例,1 2 3分別就是a b c的值了:


2)main函式中有yield,所有被傳遞給yield的引數,都被返回。因此resume的返回值,除了標誌正確執行的true外,還有傳遞給yield的引數值:


3)yield也會把多餘的引數返回給對應的resume,如下:


為啥第一個resume沒有任何輸出呢?我的答案是,yield沒有返回,print就根本還沒執行。

4)當一個coroutine結束的時候,main函式的所有返回值都被返回給resume:


我們在同一個coroutine中,很少會將上面介紹的這些功能全都用上,但是所有這些功能都是很useful的。

目前為止,我們已經瞭解了Lua中coroutine的一些知識了。下面我們需要明確幾個概念。Lua提供的是asymmetric coroutine,意思是說,它需要一個函式(yield)來掛起一個coroutine,但需要另一個函式(resume)來喚醒這個被掛起的coroutine。對應的,一些語言提供了symmetric coroutine,用來切換當前coroutine的函式只有一個。

有人想把Lua的coroutine稱為semi-coroutine,但是這個詞已經被用作別的意義了,用來表示一個被限制了一些功能來實現出來的coroutine,這樣的coroutine,只有在一個coroutine的呼叫堆疊中,沒有剩餘任何掛起的呼叫時,才會被掛起,換句話說,就是隻有main可以掛起。Python中的generator好像就是這樣一個類似的semi-coroutine。

跟asymmetric coroutine和symmetric coroutine的區別不同,coroutine和generator(Python中的)的不同在於,generator並麼有coroutine的功能強大,一些用coroutine可實現的有趣的功能,用generator是實現不了的。Lua提供了一個功能完整的coroutine,如果有人喜歡symmetric coroutine,可以自己簡單的進行一下封裝。

2. pipes和filters

couroutine的一個典型的例子就是producer-consumer問題。我們來假設有這樣兩個函式,一個不停的produce一些值出來(例如從一個file中不停地讀),另一個不斷地consume這些值(例如,寫入到另一個file中)。這兩個函式的樣子應該如下:

function producer ()
    while true do
        local x = io.read() -- produce new value
        send(x) -- send to consumer
    end
end
function consumer ()
    while true do
        local x = receive() -- receive from producer
        io.write(x, "\n") -- consume new value
    end
end

這兩個函式都不停的在執行,那麼問題來了,怎麼來匹配send和recv呢?究竟誰先誰後呢?

coroutine提供瞭解決上面問題的一個比較理想的工具resume-yield。我們還是不說廢話,先看看程式碼再來說說我自己的理解:

function receive (prod)
    local status, value = coroutine.resume(prod)
    return value
end

function send (x)
    coroutine.yield(x)
end

function producer()
    return coroutine.create(function ()
        while true do
            local x = io.read() -- produce new value
            send(x)
        end
    end)
end
	
function consumer (prod)
    while true do
        local x = receive(prod) -- receive from producer
        io.write(x, "\n") -- consume new value
    end
end

p = producer()
consumer(p)

程式先呼叫consumer, 然後recv函式去resume喚醒producer,produce一個值,send給consumer,然後繼續等待下一次resume喚醒。看下下面的這個示例應該就很明白了:


我們可以繼續擴充套件一下上面的例子,增加一個filter,在producer和consumer之間做一些資料轉換啥的。那麼filter裡都做些什麼呢?我們先看一下沒加filter之前的邏輯,基本就是producer去send,send to consumer,consumer去recv,recv from producer,可以這麼理解吧。加了filter之後呢,因為filter需要對data做一些轉換操作,因此這時的邏輯為,producer去send,send tofilter,filter去recv,recv from producer,filter去send,send to consumerconsumer去recv,recv fromfilter。紅色的部分是跟原來不同的。此時的程式碼如下:

function send(x)
    coroutine.yield(x)
end
 
function producer()
    return coroutine.create(function ()
        while true do
            local x = io.read()
            send(x)
        end 
    end)
end
 
function consumer(prod)
    while true do
        local x = receive(prod)
        if x then
            io.write(x, '\n')
        else
            break
        end 
    end 
end
 
function filter(prod)                                                                                                              
    return coroutine.create(function ()
        for line = 1, math.huge do
            local x = receive(prod)
            x = string.format('%5d %s', line, x)
            send(x)
        end 
    end)
end
 
p = producer()
f = filter(p)
consumer(f)

看完上面的例子,你是否想起了unix中的pipe?coroutine怎麼說也是multithreading的一種。使用pipe,每個task得以在各自的process裡執行,而是用coroutine,每個task在各自的coroutine中執行。pipe在writer(producer)和reader(consumer)之間提供了一個buffer,因此相對的執行速度還是相當可以的。這個是pipe很重要的一個特性,因為process間通訊,代價還是有點大的。使用coroutine,不同task之間的切換成本更小,基本上也就是一個函式呼叫,因此,writer和reader幾乎可以說是齊頭並進了啊。

3. 用coroutine實現迭代器

我們可以把迭代器 迴圈看成是一個特殊的producer-consumer例子:迭代器produce,迴圈體consume。下面我們就看一下coroutine為我們提供的強大的功能,用coroutine來實現迭代器。

我們來遍歷一個數組的全排列。先看一下普通的loop實現,程式碼如下:

function printResult(a)
    for i = 1, #a do
        io.write(a[i], ' ')
    end 
    io.write('\n')
end
 
function permgen(a, n)                                                                                                             
    n = n or #a
    if n <= 1 then
        printResult(a)
    else
        for i = 1, n do
            a[n], a[i] = a[i], a[n]
            permgen(a, n-1)
            a[n], a[i] = a[i], a[n]
        end 
    end 
end
 
permgen({1,2,3})

執行結果如下:


再看一下迭代器實現,注意比較下程式碼的改變的部分:

function printResult(a)
    for i = 1, #a do
        io.write(a[i], ' ')
    end 
    io.write('\n')
end  
        
function permgen(a, n)
    n = n or #a
    if n <= 1 then
       coroutine.yield(a) 
    else
        for i = 1, n do
            a[n], a[i] = a[i], a[n]
            permgen(a, n-1)
            a[n], a[i] = a[i], a[n]
        end 
    end 
end  
        
function permutations(a)
    local co = coroutine.create(function () permgen(a) end)                                                                        
    return function ()
        local code, res = coroutine.resume(co)
        return res 
    end 
end  
        
for p in permutations({"a", "b", "c"}) do
    printResult(p)
end 

執行結果如下:


permutations 函式使用了一個Lua中的常規模式,將在函式中去resume一個對應的coroutine進行封裝。Lua對這種模式提供了一個函式coroutine.wap 。跟create 一樣,wrap 建立一個新的coroutine ,但是並不返回給coroutine,而是返回一個函式,呼叫這個函式,對應的coroutine就被喚醒去執行。跟原來的resume 不同的是,該函式不會返回errcode作為第一個返回值,一旦有error發生,就退出了(類似C語言的assert)。使用wrap, permutations可以如下實現:

function permutations (a)
    return coroutine.wrap(function () permgen(a) end)
end

wrap 比create 跟簡單,它實在的返回了我們最需要的東西:一個可以喚醒對應coroutine的函式。 但是不夠靈活。沒有辦法去檢查wrap 建立的coroutine的status, 也不能檢查runtime-error(沒有返回errcode,而是直接assert)。

4. 非搶佔式多執行緒

從我們前面所寫的可以看到,coroutine執行一系列的協作的多執行緒。每個coroutine相當於一個thread。一個yield-resume對可以在不同的thread之間切換控制權。但是,跟常規的multithr不同,coroutine是非搶佔式的。一個coroutine在執行的時候,不可能被其他的coroutine從外部將其掛起,只有由其本身顯式地呼叫yield才會掛起,並交出控制權。對一些程式來說,這沒有任何問題,相反,因為非搶佔式的緣故,程式變得更加簡單。我們不需要擔心同步問題的bug,因為在threads之間的同步都是顯式的。我們只需要保證在對的時刻呼叫yield就可以了。

但是,使用非搶佔式multithreading,不管哪個thread呼叫了一個阻塞的操作,那麼整個程式都會被阻塞,這是不能容忍的。由於這個原因,很多程式設計師並不認為coroutine可以替代傳統的multithreading。但是,下面我們可以看到一個有趣的解決辦法。

一個很典型的multithreading場景:通過http下載多個remote files。我們先來看下如何下載一個檔案,這需要使用LuaSocket庫,如果你的開發環境沒有這個庫的話,可以看下博主的另一篇文章Lua基礎 安裝LuaSocket,瞭解下如何在Linux上安裝LuaSocket. 下載一個file的lua程式碼如下:

require("socket")

host = "www.w3.org"
file = "/standards/xml/schema"

c = assert(socket.connect(host, 80))
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n") -- 注意GET後和HTTP前面的空格

while true do
    local s, status, partial = c:receive(2^10)
    io.write(s or partial)
    if status == "closed" then
        break
    end
end

c:close()

執行結果有點長,不方便截圖,就不貼了。

現在我們就知道怎麼下載一個檔案了。現在回到前面說的下載多個remote files的問題。當我們接收一個remote file的時候,程式花費了大多數時間去等待資料的到來,也就是在receive函式的呼叫是阻塞。因此,如果能夠同時下載所有的files,那麼程式的執行速度會快很多。下面我們看一下如何用coroutine來模擬這個實現。我們為每一個下載任務建立一個thread,在一個thread沒有資料可用的時候,就呼叫yield 將程式控制權交給一個簡單的dispatcher,由dispatcher來喚醒另一個thread。下面我們先把之前的程式碼寫成一個函式,但是有少許改動,不再將file的內容輸出到stdout了,而只是間的的輸出filesize。

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end 
    end 
    c:close()
    print(file, count)
end

上面程式碼中有個函式receive ,相當於下載單個檔案中的實現如下:

function receive (connection)
    return connection:receive(2^10)
end

但是,如果要同時下載多檔案的話,這個函式必須非阻塞地接收資料。在沒有資料接收的時候,就呼叫yield掛起,交出控制權。實現應該如下:

function receive(connection)   
    connection:settimeout(0)  -- do not block          
    local s, status, partial = connection:receive(2^10)
    if status == "timeout" then
        coroutine.yield(connection)
    end                        
    return s or partial, status
end

settimeout(0)將這個連線設為非阻塞模式。當status變為“timeout”時,意味著該操作還沒完成就返回了,這種情況下,該thread就yield。傳遞給yield的non-false引數,告訴dispatcher該執行緒仍然在執行。注意,即使timeout了,該連線還是會返回它已經收到的東西,存在partial變數中。

下面的程式碼展示了一個簡單的dispatcher。表threads儲存了一系列的執行中的thread。函式get 確保每個下載任務都單獨一個thread。dispatcher本身是一個迴圈,不斷的遍歷所有的thread,一個一個的去resume。如果一個下載任務已經完成,一定要將該thread從表thread中刪除。當沒有thread在執行的時候,迴圈就停止了。

最後,程式建立它需要的threads,並呼叫dispatcher。例如,從w3c網站下載四個文件,程式如下所示:

require "socket"

function receive(connection)
    connection:settimeout(0)  -- do not block
    local s, status, partial = connection:receive(2^10)
    if status == "timeout" then
        coroutine.yield(connection)
    end
    return s or partial, status
end

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end
    end
    c:close()
    print(file, count)
end

threads = {}  -- list of all live threads

function get(host, file)
    -- create coroutine
    local co = coroutine.create(function ()
        download(host, file)
    end)
    -- intert it in the list
    table.insert(threads, co)
end

function dispatch()
    local i = 1
    while true do
        if threads[i] == nil then  -- no more threads?
            if threads[1] == nil then -- list is empty?
                break
            end
            i = 1  -- restart the loop
        end
        local status, res = coroutine.resume(threads[i])
        if not res then   -- thread finished its task?
            table.remove(threads, i)
        else
            i = i + 1
        end
    end
end

host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch() -- main loop

我的程式運行了10s左右,4個檔案已經下載完成,執行結果如下:


我又重新用阻塞式的順序下載重試了一下,需要時間12s多一點,可能檔案比較小,也不夠多,對比不是很明顯,阻塞的多檔案下載程式碼如下,其實就是上面幾段程式碼放在一塊了

function receive (connection)
    return connection:receive(2^10)
end

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end 
    end 
    c:close()
    print(file, count)
end

require "socket"

host = "www.w3.org"

download(host, "/TR/html401/html40.txt")
download(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
download(host, "/TR/REC-html32.html")
download(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")

執行結果如下,跟上面的非阻塞式有點不同,下載完成的順序,就是程式碼中寫的順序:


既然速度沒有明顯的更快,那麼有沒有優化空間呢,答案是,有。當沒有thread有資料接收時,dispatcher遍歷了每一個thread去看它有沒有資料過來,結果這個過程比阻塞式的版本多耗費了30倍的cpu。

為了避免這個情況,我們使用LuaSocket提供的select函式。它執行程式在等待一組sockets狀態改變時阻塞。程式碼改動比較少,在迴圈中,收集timeout的連線到表connections 中,當所有的連線都timeout了,dispatcher呼叫select 來等待這些連線改變狀態。該版本的程式,在博主開發環境測試,只需7s不到,就下載完成4個檔案,除此之外,對cpu的消耗也小了很多,只比阻塞版本多一點點而已。新的dispatch程式碼如下:

function dispatch()
    local i = 1 
    local connections = {}
    while true do
        if threads[i] == nil then  -- no more threads?
            if threads[1] == nil then -- list is empty?
                break
            end 
            i = 1  -- restart the loop
            connections = {}
        end       
        local status, res = coroutine.resume(threads[i])
        if not res then   -- thread finished its task?
            table.remove(threads, i)
        else   
            i = i + 1 
            connections[#connections + 1] = res 
            if #connections == #threads then   -- all threads blocked?
                socket.select(connections)
            end                                                                                                                    
        end       
    end           
end

執行結果如下:


這邊文章又是斷斷續續寫了幾天,文章的每個例子都是親自執行過的,今天終於寫完,養精蓄銳,明天開始去泰國旅行幾天,希望有一個開心的行程。

水平有限,如果有朋友發現錯誤,歡迎留言交流