skynet源碼分析之socketchannel
請求回應模式是與外部交互最常用的模式之一。通常協議設計方式有兩種:1.每個請求包對應一個回應包,有tcp保證時序,先請求的先回應,但不必收到回應才發送下一個請求,redis的協議就是這種類型;2.每個請求帶一個唯一的session標識,回應包也帶這個標識。這樣每個請求不一定都需要回應,且不用遵循先請求先回應的時序。mongodb的協議就是這種類型。skynet提供socketchannel庫封裝內部細節,支持上面兩種模式。詳情參考官方wiki https://github.com/cloudwu/skynet/wiki/SocketChannel
調用socketchannel.channel創建一個channel對象,必須提供ip地址(可以是域名)和端口。采用第一種還是第二種模式依據是否提供response參數,redis沒有提供說明用的第一種模式,mongo提供了(第13行)說明用第二種模式。
1 -- lualib/skynet/db/redis.lua 2 local channel = socketchannel.channel { 3 host = db_conf.host, 4 port = db_conf.port or 6379, 5 auth = redis_login(db_conf.auth, db_conf.db), 6 nodelay = true, 7 } 8 9 -- lualib/skynet/db/mongo.lua 10 obj.__sock = socketchannel.channel {11 host = obj.host, 12 port = obj.port, 13 response = dispatch_reply, 14 auth = mongo_auth(obj), 15 backup = backup, 16 nodelay = true, 17 } 18 19 -- lualib/skynet/socketchannel.lua 20 function socket_channel.channel(desc) 21 local c = { 22 __host = assert(desc.host), 23 __port = assert(desc.port), 24 __backup = desc.backup, 25 __auth = desc.auth, 26 __response = desc.response, -- It‘s for session mode 27 __request = {}, -- request seq { response func or session } -- It‘s for order mode 28 __thread = {}, -- coroutine seq or session->coroutine map 29 __result = {}, -- response result { coroutine -> result } 30 __result_data = {}, 31 __connecting = {}, 32 __sock = false, 33 __closed = false, 34 __authcoroutine = false, 35 __nodelay = desc.nodelay, 36 } 37 38 return setmetatable(c, channel_meta) 39 end
創建完對象後,可以手動調用connect連接對端,如果不connect,在第一次發送請求的時候會嘗試去連接。最終調用到connect_once,
第7行,調用socket庫api連接對端
第11行,fork一個協程專門處理收到回應包
15-21行,如果是模式1,收到回應包後的處理函數是dispatch_by_order,模式2則是dispatch_by_session
1 -- lualib/skynet/socketchannel.lua 2 local function connect_once(self) 3 if self.__closed then 4 return false 5 end 6 assert(not self.__sock and not self.__authcoroutine) 7 local fd,err = socket.open(self.__host, self.__port) 8 ... 9 10 self.__sock = setmetatable( {fd} , channel_socket_meta ) 11 self.__dispatch_thread = skynet.fork(dispatch_function(self), self) 12 ... 13 end 14 15 local function dispatch_function(self) 16 if self.__response then 17 return dispatch_by_session 18 else 19 return dispatch_by_order 20 end 21 end
接下來先介紹發送請求包的流程,之後再介紹如何處理回應包。調用者通過channel:request發送請求包,該接口有三個參數:參數request請求包數據;參數response在模式1下是一個function用來接收回應包,模式2下是一個唯一的session值;參數padding可選,表示將巨大消息拆分成多個小包發送出去。
第2行,檢測是否已連接,如果未連接,會嘗試去連接
第8行,調用socket庫把發送請求包。
第13-16行,不需要回應直接返回。
第18,23,35-48行,保存當前co。如果是模式2,保留session-co映射關系在self.__thread裏(38行);如果是模式1,保留response函數在self.__request裏,co在self.__threaad裏(41,42行)。
43-46行,如果有暫停的co在等待回應包,重啟它。
第24行,暫停當前co,等待對方回應包。當收到回應包時,回應處理函數會重啟它。
25-32行,返回結果給調用者。
1 function channel:request(request, response, padding) 2 assert(block_connect(self, true)) -- connect once 3 local fd = self.__sock[1] 4 5 if padding then 6 ... 7 else 8 if not socket_write(fd , request) then 9 sock_err(self) 10 end 11 end 12 13 if response == nil then 14 -- no response 15 return 16 end 17 18 return wait_for_response(self, response) 19 end 20 21 local function wait_for_response(self, response) 22 local co = coroutine.running() 23 push_response(self, response, co) 24 skynet.wait(co) 25 26 local result = self.__result[co] 27 self.__result[co] = nil 28 local result_data = self.__result_data[co] 29 self.__result_data[co] = nil 30 ... 31 32 return result_data 33 end 34 35 local function push_response(self, response, co) 36 if self.__response then 37 -- response is session 38 self.__thread[response] = co 39 else 40 -- response is a function, push it to __request 41 table.insert(self.__request, response) 42 table.insert(self.__thread, co) 43 if self.__wait_response then 44 skynet.wakeup(self.__wait_response) 45 self.__wait_response = nil 46 end 47 end 48 end
對於模式1的回應處理函數dispatch_by_order,
第4行,調用pop_response獲取第一個未回應的請求包的response和co
第6行,調用response函數,response函數調用socket庫的readline/read(24行)來等待socket上的返回,是一個阻塞操作。等socket返回後,response函數返回
第11-16行,返回結果保存在self.__result_data
第17行,重啟調用者發送請求包的co,把結果返回給調用者(上面代碼的26-32行),至此完成一次與對端請求回應交互
1 -- lualib/skynet/socketchannel.lua 2 local function dispatch_by_order(self) 3 while self.__sock do 4 local func, co = pop_response(self) 5 ... 6 local ok, result_ok, result_data, padding = pcall(func, self.__sock) 7 if ok then 8 if padding and result_ok then 9 ... 10 else 11 self.__result[co] = result_ok 12 if result_ok and self.__result_data[co] then 13 table.insert(self.__result_data[co], result_data) 14 else 15 self.__result_data[co] = result_data 16 end 17 skynet.wakeup(co) 18 end 19 end 20 end 21 22 -- lualib/skynet/db/redis.lua 23 local function read_response(fd) 24 local result = fd:readline "\r\n" 25 local firstchar = string.byte(result) 26 local data = string.sub(result,2) 27 return redcmd[firstchar](fd,data) 28 end
對於模式2的回應處理函數dispatch_by_session,
第6行,調用response函數,response函數會調用socket庫的readline/read(30行)來等待socket上的返回,是一個阻塞操作。等socket返回後,response函數返回回應包(回應包包含唯一的session)
第8行,通過session獲取對應的co
第13-21行,接下來處理跟上面一樣,保存回應包內容,重啟co。
1 -- lualib/skynet/socketchannel.lua 2 local function dispatch_by_session(self) 3 local response = self.__response 4 -- response() return session 5 while self.__sock do 6 local ok , session, result_ok, result_data, padding = pcall(response, self.__sock) 7 if ok and session then 8 local co = self.__thread[session] 9 if co then 10 if padding and result_ok then 11 ... 12 else 13 self.__thread[session] = nil 14 self.__result[co] = result_ok 15 if result_ok and self.__result_data[co] then 16 table.insert(self.__result_data[co], result_data) 17 else 18 self.__result_data[co] = result_data 19 end 20 skynet.wakeup(co) 21 end 22 else 23 self.__thread[session] = nil 24 skynet.error("socket: unknown session :", session) 25 end 26 end 27 28 -- lualib/skynet/db/mongo.lua 29 local function dispatch_reply(so) 30 local len_reply = so:read(4) 31 local reply = so:read(driver.length(len_reply)) 32 ... 33 return reply_id, succ, result 34 end
skynet源碼分析之socketchannel