1. 程式人生 > >Erlang簡單並行server

Erlang簡單並行server

pac [] binary sta his soc recursive csv client

Erlang簡單並行服務器

(金慶的專欄)

Erlang並行服務器為每一個Tcp連接創建相應的連接進程,處理client數據。



參考 Erlang程序設計(第2版)
17.1.3 順序和並行服務器

並行服務器的訣竅是:每當gen_tcp:accept收到一個新連接時就馬上分裂一個新進程。


為每一個新套接字連接創建一個並行進程。

-module(gs_svr).
-author("jinqing").

-behaviour(gen_server).

%% API
-export([start_link/0]).

init([]) ->
gs_listener:start_parallel(),

{ok, #{}}.

gs_svr(GameServer gen_server)啟動Tcp監聽。並維護連接,如連接計數,發送廣播。

start_parallel()創建監聽端口,然後創建連接進程。

start_parallel() ->
Port = server_csv:get_my_port(),
lager:info("Starting game server on port ~p...", [Port]),
{ok, ListenSocket} = gen_tcp:listen(Port,
[binary, {packet, 4},
{packet_size, 256 * 1024}, % limit packet size
{reuseaddr, true},
{nodelay, true},
{backlog, 999999},
{active, once}]),
connection:spawn_connection(ListenSocket).


spawn_connection()創建連接進程。每接受一個連接就再創建一個新的連接進程。



-module(connection).
-author("jinqing").


%% API
-export([spawn_connection/1]).
-export([parallel_connect/1, loop/2]).

-spec spawn_connection(ListenSocket :: gen_tcp:socket()) -> pid().
spawn_connection(ListenSocket) ->
spawn(fun() -> ?

MODULE:parallel_connect(ListenSocket) end).

-spec parallel_connect(ListenSocket :: gen_tcp:socket()) -> ok.
parallel_connect(ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
spawn_connection(ListenSocket),

gs_svr:cast_connection_new(self()),
ConnStat = conn_stat:new(),
erlang:send_after(1000, self(), timer_sec),
try ?MODULE:loop(Socket, ConnStat)
catch
Type:E -> lager:error("loop() ~p:~p. ~p",
[Type, E, erlang:get_stacktrace()])
end,
gs_svr:cast_connection_ended(self()),
ok.

-spec loop(Socket :: gen_tcp:socket(), ConnStat :: conn_stat:conn_stat()) -> any().
loop(Socket, ConnStat) ->
receive
{tcp, Socket, Bin} ->
NewConnStat = rpc_handler:handle_bin(Socket, Bin, ConnStat),
inet:setopts(Socket, [{active, once}]),
NewConnStat2 = cutil_dos_checker:on_data(size(Bin), NewConnStat),
?MODULE:loop(Socket, NewConnStat2#{idle_sec=>0});
{tcp_closed, Socket} ->
save_on_end(ConnStat);
{tcp_error, Socket, Reason} ->
save_on_end(ConnStat);

{gs_to_connection, Msg} ->
NewConnStat = handle_gs_msg(Msg, Socket, ConnStat),
?MODULE:loop(Socket, NewConnStat);

timer_sec ->
case conn_timer:timer_sec(ConnStat) of
{ok, NewConnStat} ->
erlang:send_after(1000, self(), timer_sec),
?MODULE:loop(Socket, NewConnStat);
end;
Other ->
lager:error("Unknown msg: ~p", [Other]),
?MODULE:loop(Socket, ConnStat)
end. % This is tail-recursive.

缺點是連接進程沒有增加監控樹。

gs_svr出錯重新啟動時,連接進程connection應該斷開並退出。

Erlang簡單並行server