skynet框架應用 (十三) 閘道器服務
skynet 提供了一個通用模板 lualib/snax/gateserver.lua 來啟動一個閘道器伺服器,通過 TCP 連線和客戶端交換資料。
TCP 基於資料流,但一般我們需要以帶長度資訊的資料包的結構來做資料交換。gateserver 做的就是這個工作,把資料流切割成包的形式轉發到可以處理它的地址。
local gateserver = require "snax.gateserver"
local handler = {} --必須提供一張表,表裡面定義connect、message等相關回調函式
-- register handlers here
gateserver.start(handler) --閘道器服務的入口函式
13.1最簡單閘道器服務
13.1.1 編寫mygateserver.lua
示例程式碼:mygateserver.lua
local skynet = require "skynet"
local gateserver = require "snax.gateserver"
local handler = {}
--當一個客戶端連結進來,gateserver自動處理連結,並且呼叫該函式,必須要有
function handler.connect(fd, ipaddr)
skynet.error("ipaddr:" ,ipaddr,"fd:",fd,"connect")
gateserver.openclient(fd) --連結成功不代表馬上可以讀到資料,需要開啟這個套接字,允許fd接收資料
end
--當一個客戶端斷開連結後呼叫該函式,必須要有
function handler.disconnect(fd)
skynet.error("fd:", fd, "disconnect")
end
--當fd有資料到達了,會呼叫這個函式,前提是fd需要呼叫gateserver.openclient開啟
function handler.message(fd, msg, sz)
skynet.error ("recv message from fd:", fd)
end
gateserver.start(handler)
13.1.2 啟動mygateserver
可以使用普通服務建立方式來建立一個mygateserver服務,但是這個服務啟動後,並不能馬上開始工作,
需要你給mygateserver傳送一個lua訊息open並且告訴gateserver監聽的埠、最大連線數、延時等資訊來開啟mygateserver服務。
程式碼如下openmygateserver.lua
local skynet = require "skynet"
skynet.start(function()
skynet.error("Server start")
local gateserver = skynet.newservice("mygateserver") --啟動剛才寫的閘道器服務
skynet.call(gateserver, "lua", "open", { --需要給閘道器服務傳送open訊息,來啟動監聽
port = 8002,--監聽的埠
maxclient = 64,--客戶端最大連線數
nodelay = true,--是否延遲TCP
})
skynet.error("gate server setup on", 8002)
skynet.exit()
end)
執行結果:
$ ./skynet examples/config openmygateserver #啟動openmygateserver [:01000010] LAUNCH snlua openmygateserver [:01000010] Server start [:01000012] LAUNCH snlua mygateserver #啟動mygateserver [:01000012] Listen on 0.0.0.0:8002 #開始監聽埠8002 [:01000010] gate server setup on 8002 [:01000010] KILL self#openmygateserver退出
重開一個終端,啟動一個C語言的socketclient客戶端(在程式碼在9.2中)去連線8002埠,觀察skynet服務情況:
$ ./skynet examples/config openmygateserver [:01000010] LAUNCH snlua openmygateserver [:01000010] Server start [:01000012] LAUNCH snlua mygateserver [:01000012] Listen on 0.0.0.0:8002 [:01000010] gate server setup on 8002 [:01000010] KILL self [:01000012] ipaddr: 127.0.0.1:48008 fd: 9 connect #連線進入 [:01000012] fd: 9 disconnect #斷開連線
上面的結果可以看連線與斷開連線都能執行,但是handler.message並沒有執行。這是由於snax.gateserver
基於TCP協議包裝了一個兩位元組資料長度的協議。而現在版本的socketclient並沒有按照這種協議傳送。
13.2 gateserver應用協議
13.2.1 兩字資料長度協議
gateserver應用協議是基於TCP協議做了一層簡單的封裝,前兩個位元組表示資料包的長度len(不計算這兩個表示長度的位元組),高位元組在前低位元組在後(大端序),後面緊跟len位元組數的資料。例如:
\x00\x05 \x31\x32\x33\x34\x35 | | len data
由於只用兩位元組表示資料長度,那麼這個包的data最大隻能是65535位元組。這種協議包方式可以解決TCP粘包的問題,也是TCP通訊當中最常用的一種應用層協議包定義方式。
所以如果想通過TCP與gateserver通訊必須要按照這種協議進行組包解包。否則gateserver肯定是不識別的。
13.2.2 打包與解包
打包與解包TCP網路資料可以使用skynet.netpack庫
local netpack = require "skynet.netpack" --使用netpack
--打包資料str,返回一個C指標msg,sz,申請記憶體
netpack.pack(str)
--解包資料,返回一個lua的字串,會釋放記憶體
netpack.tostring(msg, sz)
13.2.3 client使用長度協議發包
下面我們通過改寫socketclient.c檔案來組包傳送資料:
void* readthread(void* arg) { pthread_detach(pthread_self()); int sockfd = (int)arg; int n = 0; char buf[MAXLINE]; while (1) { n = read(sockfd, buf, MAXLINE); if (n == 0) { printf("the other side has been closed.\n"); close(sockfd); exit(0); } else write(STDOUT_FILENO, buf, n); } return (void*)0; } int main(int argc, char *argv[]) { if(argc != 2) { printf("usage:%s port", argv[0]); return -1; } int port = atoi(argv[1]); struct sockaddr_in servaddr; int sockfd; short size, nsize; char buf[MAXLINE]; unsigned char sendbuf[MAXLINE]; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(port); connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); pthread_t thid; pthread_create(&thid, NULL, readthread, (void*)sockfd); while (fgets(buf, MAXLINE, stdin) != NULL) { size = (short)strlen(buf); //計算需要傳送的資料包長度 nsize = htons(size);//轉換成大端序 memcpy(sendbuf, &nsize, sizeof(nsize)); //nsize先填入sendbuf memcpy(sendbuf+sizeof(nsize), buf, size); //再填入buf內容 write(sockfd, sendbuf, size + sizeof(nsize)); } close(sockfd); return 0; }
先執行skynet服務,然後客戶端執行:
$ gcc socketclient.c -o socketclient -lpthread ./socketclient 8002
切回skynet執行的終端:
$ ./skynet examples/config testmygateserver [:01000010] LAUNCH snlua testmygateserver [:01000010] Server start [:01000012] LAUNCH snlua mygateserver [:01000012] Listen on 0.0.0.0:8002 [:01000010] gate server setup on 8002 [:01000010] KILL self [:01000012] ipaddr: 127.0.0.1:48012 fd: 9 connect [:01000012] recv message from fd: 9 #已經呼叫handler.message
13.2.4 gateserver解包
上面的實驗中mygateserver.lua中的handler.message已經被呼叫,並且告訴fd為9的套接字發來了資料,msg表示C資料指標,sz表示資料長度,在lua中無法直接使用,需要轉換成lua可識別的資料,需要呼叫netpack.tostring
函式來解包。下面來改寫一下mygateserver.lua:
local skynet = require "skynet"
local gateserver = require "snax.gateserver"
local netpack = require "skynet.netpack" --使用netpack
local handler = {}
--當一個客戶端連結進來,gateserver自動處理連結,並且呼叫該函式
function handler.connect(fd, ipaddr)
skynet.error("ipaddr:",ipaddr,"fd:",fd,"connect")
gateserver.openclient(fd)
end
--當一個客戶端斷開連結後呼叫該函式
function handler.disconnect(fd)
skynet.error("fd:", fd, "disconnect")
end
--接收訊息
function handler.message(fd, msg, sz)
skynet.error("recv message from fd:", fd)
skynet.error(netpack.tostring(msg, sz)) --把 handler.message 方法收到的 msg,sz 轉換成一個 lua string,並釋放 msg 佔用的 C 記憶體。
end
gateserver.start(handler)
執行結果:
$ ./skynet examples/config openmygateserver [:01000010] LAUNCH snlua openmygateserver [:01000010] Server start [:01000012] LAUNCH snlua mygateserver [:01000012] Listen on 0.0.0.0:8002 [:01000010] gate server setup on 8002 [:01000010] KILL self [:01000012] ipaddr: 127.0.0.1:48018 fd: 9 connect [:01000012] recv message from fd: 9 [:01000012] aaaaaaaaaaa #正常接收到資料。 [:01000012] fd: 9 disconnect
需要注意的是:msg是一個C指標指向了一塊堆空間,如果你不進行任何處理,那麼也要呼叫skynet.trash來釋放底層的記憶體。
13.3 控制客戶端連線數
閘道器服務最重要的任務就是控制客戶端連線數,避免大量客戶登入到這個服務上。
修改openmygateserver.lua
local skynet = require "skynet"
skynet.start(function()
skynet.error("Server start")
local gateserver = skynet.newservice("mygateserver")
skynet.call(gateserver, "lua", "open", {
port = 8002,--監聽的埠
maxclient = 2,--客戶端最大連線數改為2個
nodelay = true,--是否延遲TCP
})
skynet.error("gate server setup on", 8002)
skynet.exit()
end)
執行openmygateserver.lua,再執行三個socketclient,結果如下:
openmygateserver [:0100000a] LAUNCH snlua openmygateserver [:0100000a] Server start [:0100000b] LAUNCH snlua mygateserver [:0100000b] Listen on 0.0.0.0:8002 [:0100000b] open by :0100000a [:0100000b] listen on 8002 [:0100000b] client max 2 [:0100000b] nodelay true [:0100000a] gate server setup on 8002 [:0100000a] KILL self [:0100000b] ipaddr: 127.0.0.1:46650 fd: 7 connect #第一個客戶端連線成功 [:0100000b] ipaddr: 127.0.0.1:46652 fd: 8 connect #第二個客戶端連線成功 [:0100000b] fd: 9 disconnect #第三個客戶端連線成功後,馬上關閉
13.4 gateserver其他回撥函式
--如果你希望在監聽埠開啟的時候,做一些初始化操作,可以提供 open 這個方法。
--source 是請求來源地址,conf 是開啟 gate 服務的引數表(埠,連線數,是否延遲)。