1. 程式人生 > >skynet框架應用 (十三) 閘道器服務

skynet框架應用 (十三) 閘道器服務

13 閘道器服務

​ skynet 提供了一個通用模板 lualib/snax/gateserver.lua 來啟動一個閘道器伺服器,通過 TCP 連線和客戶端交換資料。

​ TCP 基於資料流,但一般我們需要以帶長度資訊的資料包的結構來做資料交換。gateserver 做的就是這個工作,把資料流切割成包的形式轉發到可以處理它的地址。

local gateserver = require "snax.gateserver"

local handler = {}        --必須提供一張表,表裡面定義connect、message等相關回調函式

-- register handlers here
gateserver.start(handler)  --閘道器服務的入口函式

13.1最簡單閘道器服務

13.1.1 編寫mygateserver.lua

示例程式碼:mygateserver.lua

local skynet = require "skynet"
local gateserver = require "snax.gateserver"

local handler = {}

--當一個客戶端連結進來,gateserver自動處理連結,並且呼叫該函式,必須要有
function handler.connect(fd, ipaddr)   
skynet.error("ipaddr:"
,ipaddr,"fd:",fd,"connect")
gateserver.openclient(fd) --連結成功不代表馬上可以讀到資料,需要開啟這個套接字,允許fd接收資料 end --當一個客戶端斷開連結後呼叫該函式,必須要有 function handler.disconnect(fd)   skynet.error("fd:", fd, "disconnect") end --當fd有資料到達了,會呼叫這個函式,前提是fd需要呼叫gateserver.openclient開啟 function handler.message(fd, msg, sz) skynet.error
("recv message from fd:", fd)
end gateserver.start(handler)

13.1.2 啟動mygateserver

​ 可以使用普通服務建立方式來建立一個mygateserver服務,但是這個服務啟動後,並不能馬上開始工作,

需要你給mygateserver傳送一個lua訊息open並且告訴gateserver監聽的埠、最大連線數、延時等資訊來開啟mygateserver服務。

程式碼如下openmygateserver.lua

local skynet = require "skynet"

skynet.start(function()
skynet.error("Server start")
local gateserver = skynet.newservice("mygateserver") --啟動剛才寫的閘道器服務
skynet.call(gateserver, "lua", "open", {   --需要給閘道器服務傳送open訊息,來啟動監聽
port = 8002,--監聽的埠
maxclient = 64,--客戶端最大連線數
nodelay = true,--是否延遲TCP
})

skynet.error("gate server setup on", 8002)
skynet.exit()
end)

執行結果:

$ ./skynet examples/config
openmygateserver  #啟動openmygateserver
[:01000010] LAUNCH snlua openmygateserver
[:01000010] Server start
[:01000012] LAUNCH snlua mygateserver  #啟動mygateserver
[:01000012] Listen on 0.0.0.0:8002   #開始監聽埠8002
[:01000010] gate server setup on 8002
[:01000010] KILL self#openmygateserver退出

重開一個終端,啟動一個C語言的socketclient客戶端(在程式碼在9.2中)去連線8002埠,觀察skynet服務情況:

$ ./skynet examples/config
openmygateserver  
[:01000010] LAUNCH snlua openmygateserver
[:01000010] Server start
[:01000012] LAUNCH snlua mygateserver
[:01000012] Listen on 0.0.0.0:8002
[:01000010] gate server setup on 8002
[:01000010] KILL self
[:01000012] ipaddr: 127.0.0.1:48008 fd: 9 connect  #連線進入
[:01000012] fd: 9 disconnect   #斷開連線

​ 上面的結果可以看連線與斷開連線都能執行,但是handler.message並沒有執行。這是由於snax.gateserver

基於TCP協議包裝了一個兩位元組資料長度的協議。而現在版本的socketclient並沒有按照這種協議傳送。

13.2 gateserver應用協議

13.2.1 兩字資料長度協議

​ gateserver應用協議是基於TCP協議做了一層簡單的封裝,前兩個位元組表示資料包的長度len(不計算這兩個表示長度的位元組),高位元組在前低位元組在後(大端序),後面緊跟len位元組數的資料。例如:

\x00\x05    \x31\x32\x33\x34\x35
    |            |
  len           data

​ 由於只用兩位元組表示資料長度,那麼這個包的data最大隻能是65535位元組。這種協議包方式可以解決TCP粘包的問題,也是TCP通訊當中最常用的一種應用層協議包定義方式。

​ 所以如果想通過TCP與gateserver通訊必須要按照這種協議進行組包解包。否則gateserver肯定是不識別的。

13.2.2 打包與解包

​ 打包與解包TCP網路資料可以使用skynet.netpack庫

local netpack = require "skynet.netpack" --使用netpack
--打包資料str,返回一個C指標msg,sz,申請記憶體
netpack.pack(str) 

--解包資料,返回一個lua的字串,會釋放記憶體
netpack.tostring(msg, sz)

13.2.3 client使用長度協議發包

下面我們通過改寫socketclient.c檔案來組包傳送資料:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>

#define MAXLINE 128

void* readthread(void* arg)
{
pthread_detach(pthread_self());
int sockfd = (int)arg;
int n = 0;
char buf[MAXLINE];
while (1) 
    {
n = read(sockfd, buf, MAXLINE);
if (n == 0)
{
printf("the other side has been closed.\n");
close(sockfd);
exit(0);
}
else
write(STDOUT_FILENO, buf, n);
}
return (void*)0;
}

int main(int argc, char *argv[])
{
if(argc != 2)
{
printf("usage:%s port", argv[0]);
return -1;
}
int port = atoi(argv[1]);
struct sockaddr_in servaddr;
int sockfd;
short size, nsize;
char buf[MAXLINE];
unsigned char sendbuf[MAXLINE];

sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(port);
connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

pthread_t thid;
pthread_create(&thid, NULL, readthread, (void*)sockfd);

while (fgets(buf, MAXLINE, stdin) != NULL) 
{
size = (short)strlen(buf);   //計算需要傳送的資料包長度
nsize = htons(size);//轉換成大端序
memcpy(sendbuf, &nsize, sizeof(nsize));  //nsize先填入sendbuf
memcpy(sendbuf+sizeof(nsize), buf, size); //再填入buf內容
write(sockfd, sendbuf, size + sizeof(nsize));
}
close(sockfd);
return 0;
}

先執行skynet服務,然後客戶端執行:

$ gcc  socketclient.c -o socketclient -lpthread
./socketclient 8002

切回skynet執行的終端:

$ ./skynet examples/config
testmygateserver
[:01000010] LAUNCH snlua testmygateserver
[:01000010] Server start
[:01000012] LAUNCH snlua mygateserver
[:01000012] Listen on 0.0.0.0:8002
[:01000010] gate server setup on 8002
[:01000010] KILL self
[:01000012] ipaddr: 127.0.0.1:48012 fd: 9 connect
[:01000012] recv message from fd: 9    #已經呼叫handler.message

13.2.4 gateserver解包

​ 上面的實驗中mygateserver.lua中的handler.message已經被呼叫,並且告訴fd為9的套接字發來了資料,msg表示C資料指標,sz表示資料長度,在lua中無法直接使用,需要轉換成lua可識別的資料,需要呼叫netpack.tostring函式來解包。下面來改寫一下mygateserver.lua:

local skynet = require "skynet"
local gateserver = require "snax.gateserver"
local netpack = require "skynet.netpack" --使用netpack

local handler = {}

--當一個客戶端連結進來,gateserver自動處理連結,並且呼叫該函式
function handler.connect(fd, ipaddr)   
skynet.error("ipaddr:",ipaddr,"fd:",fd,"connect")
gateserver.openclient(fd)
end

--當一個客戶端斷開連結後呼叫該函式
function handler.disconnect(fd)
skynet.error("fd:", fd, "disconnect")
end


--接收訊息
function handler.message(fd, msg, sz)
skynet.error("recv message from fd:", fd)
skynet.error(netpack.tostring(msg, sz))   --把 handler.message 方法收到的 msg,sz 轉換成一個 lua string,並釋放 msg 佔用的 C 記憶體。
end

gateserver.start(handler)

執行結果:

$ ./skynet examples/config
openmygateserver
[:01000010] LAUNCH snlua openmygateserver
[:01000010] Server start
[:01000012] LAUNCH snlua mygateserver
[:01000012] Listen on 0.0.0.0:8002
[:01000010] gate server setup on 8002
[:01000010] KILL self
[:01000012] ipaddr: 127.0.0.1:48018 fd: 9 connect
[:01000012] recv message from fd: 9
[:01000012] aaaaaaaaaaa     #正常接收到資料。

[:01000012] fd: 9 disconnect

需要注意的是:msg是一個C指標指向了一塊堆空間,如果你不進行任何處理,那麼也要呼叫skynet.trash來釋放底層的記憶體。

13.3 控制客戶端連線數

​ 閘道器服務最重要的任務就是控制客戶端連線數,避免大量客戶登入到這個服務上。

修改openmygateserver.lua

local skynet = require "skynet"

skynet.start(function()
skynet.error("Server start")
local gateserver = skynet.newservice("mygateserver") 
skynet.call(gateserver, "lua", "open", {   
port = 8002,--監聽的埠
maxclient = 2,--客戶端最大連線數改為2個
nodelay = true,--是否延遲TCP
})

skynet.error("gate server setup on", 8002)
skynet.exit()
end)

執行openmygateserver.lua,再執行三個socketclient,結果如下:

openmygateserver
[:0100000a] LAUNCH snlua openmygateserver
[:0100000a] Server start
[:0100000b] LAUNCH snlua mygateserver
[:0100000b] Listen on 0.0.0.0:8002
[:0100000b] open by  :0100000a
[:0100000b] listen on 8002
[:0100000b] client max 2
[:0100000b] nodelay true
[:0100000a] gate server setup on 8002
[:0100000a] KILL self
[:0100000b] ipaddr: 127.0.0.1:46650 fd: 7 connect  #第一個客戶端連線成功
[:0100000b] ipaddr: 127.0.0.1:46652 fd: 8 connect  #第二個客戶端連線成功
[:0100000b] fd: 9 disconnect #第三個客戶端連線成功後,馬上關閉

13.4 gateserver其他回撥函式

--如果你希望在監聽埠開啟的時候,做一些初始化操作,可以提供 open 這個方法。
--source 是請求來源地址,conf 是開啟 gate 服務的引數表(埠,連線數,是否延遲)。