第17章 套接字程式設計
阿新 • • 發佈:2018-12-16
1.使用TCP從伺服器獲取資料
%%socket_examples.erl -module(socket_examples). -export([nano_get_url/0,nano_get_url/1,receive_data/2]). nano_get_url() -> nano_get_url("www.baidu.com"). nano_get_url(Host) -> {ok,Socket}=gen_tcp:connect(Host,80,[binary,{packet,0}]), ok=gen_tcp:send(Socket,"Get / HTTP/1.0\r\n\r\n"), receive_data(Socket,[]). receive_data(Socket,SoFar) -> receive {tcp,Socket,Bin} -> receive_data(Socket,[Bin|SoFar]); {tcp_closed,Socket} -> list_to_binary(lists:reverse(SoFar)) end.
- 工作方式:
- 1)呼叫gen_tcp:connect來開啟一個到www.baidu.com80埠的TCP套接字。連線呼叫裡的binary引數告訴系統要以“二進位制”模式開啟套接字,並把所有資料用二進位制型傳給應用程式。{packet,0}的意思是把未經修改的TCP資料直接傳給應用程式。
- 2) 呼叫gen_tcp:send,把訊息GET / HTTP/1.0\r\n\r\n傳送給套接字,然後等待回覆。這個回覆並不是放在一個數據包裡,而是分成多個片段,一次傳送一點。這些片段會被接收成為訊息序列,傳送給開啟(或控制)套接字的程序。
- 3)收到一個{tcp,Socket,Bin}訊息。這個元組的第三個引數是一個二進位制型,原因是開啟套接字時使用了二進位制模式。這個訊息是Web伺服器傳送給我們的資料片段之一。把它新增到目前已收到的片段列表中,然後等待下一個片段。
- 4)收到一個{tcp_closed, Socket}訊息。這會在伺服器完成資料傳送時發生。
- 5)當所有片段都到達後,因為它們的儲存順序是錯誤的,所以反轉它們並連線所有片段。
- 呼叫示例:B=socket_examples:nano_get_url().
- 轉換輸出格式: io:format("~p~n",[B]). 或是 string:tokens(binary_to_list(B),"\r\n").
2.一個簡單的TCP伺服器
%%socket_server.erl: -module(socket_server). -export([start_nano_server/0,loop/1,nano_client_eval/1]). start_nano_server() -> {ok,Listen} =gen_tcp:listen(2345,[binary,{packet,4},{reuseaddr,true},{active,true}]), {ok,Socket}=gen_tcp:accept(Listen), gen_tcp:close(Listen), loop(Socket). loop(Socket) -> receive {tcp,Socket,Bin} -> io:format("Server received binary =~p~n",[Bin]), Str=binary_to_term(Bin), io:format("Server (unpacked) ~p~n",[Str]), Reply=lib_misc:string2value(Str), io:format("Server replying=~p~n",[Reply]), gen_tcp:send(Socket,term_to_binary(Reply)), loop(Socket); {tcp_closed,Socket} -> io:format("Server socket closed~n") end. nano_client_eval(Str) -> {ok,Socket}= gen_tcp:connect("localhost",2345,[binary,{packet,4}]), ok=gen_tcp:send(Socket,term_to_binary(Str)), receive {tcp,Socket,Bin}-> io:format("Client received binary =~p~n",[Bin]), Val=binary_to_term(Bin), io:format("Client result =~p~n",[Val]), gen_tcp:close(Socket) end.
- 工作方式:
- 1)首先,呼叫gen_tcp:listen來監聽2345埠的連線,並設定訊息的打包約定。{packet,4}的意思是每個應用程式訊息前部都有一個4位元組的長度包頭。然後gen_tcp:listen(..)會返回{ok, Listen}或{error, Why},但我們只關心能夠開啟套接字的返回值。因此,編寫如下程式碼 {ok,Listen}=gen_tcp:listen(....).
- 這會讓程式在gen_tcp:listen返回{error, ...}時丟擲一個模式匹配異常錯誤。在成功的情況下,這個語句會繫結Listen到剛監聽的套接字上。我們只能對監聽埠做一件事,那就是把它用作gen_tcp:accept的引數。
- 2)現在呼叫gen_tcp:accept(Listen)。在這個階段,程式會掛起並等待一個連線。當我們收到連線時,這個函式就會返回變數Socket,它綁定了可以與連線客戶端通訊的套接字。
- 3)在accept返回後立即呼叫gen_tcp:close(Listen)。這樣就關閉了監聽套接字,使伺服器不再接收任何新連線。這麼做不會影響現有連線,只會阻止新連線。
- 4)解碼輸入資料
- 5)然後執行字串
- 6)然後編碼回覆資料並把它發回套接字
- 7)同時定義一個客戶端.
- 應用示例:(注意要開啟兩個shell視窗)
- socket_server:start_nano_server().
- socket_server:nano_client_eval("lsit_to_tuple([2+3*4,10+20])").
3.順序和並行伺服器
- 1)順序伺服器
將原始碼: start_nano_server() -> {ok,Listen}=gen_tcp:listen(....), {ok,Socket}=gen_tcp:accept(Listen), loop(Socket).... 修改為: start_seq_server() -> {ok,Listen} =gen_tcp:listen(....), seq_loop(Listen). seq_loop(Listen) -> {ok,Socket}=gen_tcp:accept(Listen), loop(Socket), seq_loop(Listen). loop(....) .... %和以前一樣
- 2)並行伺服器:每當gen_tcp:accept收到一個新連線時就立即分裂一個新程序
將原始碼: start_parallel_server() -> {ok,Listen}=gen_tcp:listen(...), spawn(fun() -> par_connect(Listen) end). 改為: par_connect(Listen) -> {ok,Socket}=gen_tcp:accept(Listen), spawn(fun() -> par_connect(Listen )end), loop(Socket). loop(...).....%和之前一樣
4.注意點:
- 1)建立某個套接字(通過呼叫gen_tcp:accept或gen_tcp:connect)的程序被稱為該套接字的控制程序。所有來自套接字的訊息都會被髮送到控制程序。如果控制程序掛了,套接字就會被關閉。某個套接字的控制程序可以通過呼叫gen_tcp:controlling_process(Socket, NewPid)修改成NewPid
- 2)我們的並行伺服器可能會創建出幾千個連線,所以可以限制最大同時連線數。實現的方法可以是維護一個計數器來統計任一時刻有多少活動連線。每當收到一個新連線時就讓計數器加1,每當一個連線結束時就讓它減1。可以用它來限制系統裡的同時連線總數。
- 3)接受一個連線後,顯式設定必要的套接字選項是一種很好的做法,就像這樣:
{ok,Socket}=gen_tcp:accept(Listen), inet:setopts(Socket,[{packet,4},binary,{nodelay,true},{active,true}]), loop(Socket)
- 4)Erlang 的 R11B-3 版開始允許多個 Erlang 程序對同一個監聽套接字呼叫 gen_tcp:accept/1。這讓編寫並行伺服器變得簡單了,因為你可以生成一個預先分裂好的程序池,讓它們都處在gen_tcp:accept/1的等待狀態。
5.主動和被動套接字
- 1)三種開啟模式:主動(active),單次主動(active once) ,被動(passive) . 通過在gen_tcp:connect(Address, Port, Options)或gen_tcp:listen(Port, Options)的Options引數里加入{active, true | false | once}選項實現的。
- 2)主動資訊接收(非阻塞式)
{ok,Listen}=gen_tcp:listen(Port,[....,{active,true}...]), {ok,Socket}=gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> receive {tcp,Socket,Data}-> .......對資料進行操作.... {tcp_closed,Socket}-> ..... end. %當客戶端生成資料的速度快於伺服器處理資料的速度,系統就會遭受資料洪流的衝擊
- 3)被動資訊接收(阻塞式)
{ok,Listen}=gen_tcp:listen(Port,[....,{active,false}...]), {ok,Socket}=gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> case gen_tcp:recv(Socket,N) of {ok,B} -> .......對資料進行操作.... loop(Socket); {error,closed} ..... end. %每次想要接收資料就呼叫gen_tcp:recv,否則客戶端會一直阻塞.
- 4)混合資訊接收(部分阻塞式)
{ok,Listen}=gen_tcp:listen(Port,[....,{active,once}...]), {ok,Socket}=gen_tcp:accept(Listen), loop(Socket). loop(Socket) -> receive {tcp,Socket,Data} -> .......對資料進行操作.... %當你準備好啟動下一個資訊的接收時 inet:setopts(Sock,[{active,once}]), loop(Socket); {tcp_closed,Socket}-> ..... end. %當控制程序收到一個訊息後,必須顯式呼叫inet:setopts才能重啟下一個訊息的接收,否則系統處於阻塞狀態.
6.套接字錯誤處理
- 當伺服器因為程式錯誤掛了,那麼伺服器支配的套接字就會被自動關閉,同時向客戶端傳送一個{tcp_closed,Socket}訊息
7.UDP
%應用示例:一個UDP階乘伺服器
-module(udp_test).
-export([start_server/0,client/1]).
start_server() ->
spawn(fun() -> server(4000) end).
%伺服器
server(Port) ->
{ok,Socket} =gen_udp:open(Port,[binary]),
io:format("server opened socket:~p~n",[Socket]),
loop(Socket).
loop(Socket) ->
receive
{udp,Socket,Host,Port,Bin}=Msg ->
io:format("server received:~p~n",[Msg]),
N=binary_to_term(Bin),
Fac=fac(N),
gen_udp:send(Socket,Host,Port,term_to_binary(Fac)),
loop(Socket)
end.
fac(0)->1;
fac(N) ->N* fac(N-1).
%客戶端
client(N) ->
{ok,Socket}=gen_udp:open(0,[binary]),
io:format("client opened socket=~p~n",[Socket]),
ok=gen_udp:send(Socket,"localhost",4000,term_to_binary(N)),
Value=receive
{udp,Socket,_,_,Bin}=Msg ->
io:format("client received:~p~n",[Msg]),
binary_to_term(Bin)
after 2000 ->
0
end,
gen_udp:close(Socket),
value.
8.UDP資料包可能會二次傳輸,所以可呼叫Erlang的內建函式make_ref,可以確保返回一個全域性唯一的引用.