1. 程式人生 > >第17章 套接字程式設計

第17章 套接字程式設計

1.使用TCP從伺服器獲取資料

%%socket_examples.erl
-module(socket_examples).
-export([nano_get_url/0,nano_get_url/1,receive_data/2]).

nano_get_url() ->
    nano_get_url("www.baidu.com").

nano_get_url(Host) ->
    {ok,Socket}=gen_tcp:connect(Host,80,[binary,{packet,0}]),
    ok=gen_tcp:send(Socket,"Get / HTTP/1.0\r\n\r\n"),
    receive_data(Socket,[]).

receive_data(Socket,SoFar) ->
    receive
        {tcp,Socket,Bin} ->
            receive_data(Socket,[Bin|SoFar]);
        {tcp_closed,Socket} ->
            list_to_binary(lists:reverse(SoFar))
    end.
  • 工作方式:
  • 1)呼叫gen_tcp:connect來開啟一個到www.baidu.com80埠的TCP套接字。連線呼叫裡的binary引數告訴系統要以“二進位制”模式開啟套接字,並把所有資料用二進位制型傳給應用程式。{packet,0}的意思是把未經修改的TCP資料直接傳給應用程式。
  • 2) 呼叫gen_tcp:send,把訊息GET / HTTP/1.0\r\n\r\n傳送給套接字,然後等待回覆。這個回覆並不是放在一個數據包裡,而是分成多個片段,一次傳送一點。這些片段會被接收成為訊息序列,傳送給開啟(或控制)套接字的程序。
  • 3)收到一個{tcp,Socket,Bin}訊息。這個元組的第三個引數是一個二進位制型,原因是開啟套接字時使用了二進位制模式。這個訊息是Web伺服器傳送給我們的資料片段之一。把它新增到目前已收到的片段列表中,然後等待下一個片段。
  • 4)收到一個{tcp_closed, Socket}訊息。這會在伺服器完成資料傳送時發生。
  • 5)當所有片段都到達後,因為它們的儲存順序是錯誤的,所以反轉它們並連線所有片段。
  • 呼叫示例:B=socket_examples:nano_get_url().
  • 轉換輸出格式: io:format("~p~n",[B]). 或是 string:tokens(binary_to_list(B),"\r\n").

2.一個簡單的TCP伺服器

%%socket_server.erl:
-module(socket_server).
-export([start_nano_server/0,loop/1,nano_client_eval/1]).

start_nano_server() ->
    {ok,Listen} =gen_tcp:listen(2345,[binary,{packet,4},{reuseaddr,true},{active,true}]),
    {ok,Socket}=gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).
loop(Socket) ->
    receive
        {tcp,Socket,Bin} ->
            io:format("Server received binary =~p~n",[Bin]),
            Str=binary_to_term(Bin),
            io:format("Server (unpacked) ~p~n",[Str]),
            Reply=lib_misc:string2value(Str),
            io:format("Server replying=~p~n",[Reply]),
            gen_tcp:send(Socket,term_to_binary(Reply)),
        loop(Socket);
        {tcp_closed,Socket} ->
            io:format("Server socket closed~n")
    end.
nano_client_eval(Str) ->
    {ok,Socket}=
        gen_tcp:connect("localhost",2345,[binary,{packet,4}]),
    ok=gen_tcp:send(Socket,term_to_binary(Str)),
    receive
        {tcp,Socket,Bin}->
            io:format("Client received binary =~p~n",[Bin]),
            Val=binary_to_term(Bin),
            io:format("Client result =~p~n",[Val]),
            gen_tcp:close(Socket)
    end.

 

  • 工作方式:
  • 1)首先,呼叫gen_tcp:listen來監聽2345埠的連線,並設定訊息的打包約定。{packet,4}的意思是每個應用程式訊息前部都有一個4位元組的長度包頭。然後gen_tcp:listen(..)會返回{ok, Listen}或{error, Why},但我們只關心能夠開啟套接字的返回值。因此,編寫如下程式碼 {ok,Listen}=gen_tcp:listen(....).
  • 這會讓程式在gen_tcp:listen返回{error, ...}時丟擲一個模式匹配異常錯誤。在成功的情況下,這個語句會繫結Listen到剛監聽的套接字上。我們只能對監聽埠做一件事,那就是把它用作gen_tcp:accept的引數。
  • 2)現在呼叫gen_tcp:accept(Listen)。在這個階段,程式會掛起並等待一個連線。當我們收到連線時,這個函式就會返回變數Socket,它綁定了可以與連線客戶端通訊的套接字。
  • 3)在accept返回後立即呼叫gen_tcp:close(Listen)。這樣就關閉了監聽套接字,使伺服器不再接收任何新連線。這麼做不會影響現有連線,只會阻止新連線。
  • 4)解碼輸入資料
  • 5)然後執行字串
  • 6)然後編碼回覆資料並把它發回套接字
  • 7)同時定義一個客戶端.
  • 應用示例:(注意要開啟兩個shell視窗)
  • socket_server:start_nano_server().
  • socket_server:nano_client_eval("lsit_to_tuple([2+3*4,10+20])").

3.順序和並行伺服器

  • 1)順序伺服器
    將原始碼: 
    start_nano_server() -> 
        {ok,Listen}=gen_tcp:listen(....), 
        {ok,Socket}=gen_tcp:accept(Listen), 
        loop(Socket)....
    修改為:
    start_seq_server() -> 
        {ok,Listen} =gen_tcp:listen(....), 
        seq_loop(Listen).
        seq_loop(Listen) -> 
            {ok,Socket}=gen_tcp:accept(Listen), 
            loop(Socket), 
            seq_loop(Listen). 
    loop(....) .... %和以前一樣
  • 2)並行伺服器:每當gen_tcp:accept收到一個新連線時就立即分裂一個新程序
    將原始碼:
    start_parallel_server() -> 
        {ok,Listen}=gen_tcp:listen(...), 
        spawn(fun() -> 
            par_connect(Listen) end).
    改為:
    par_connect(Listen) -> 
        {ok,Socket}=gen_tcp:accept(Listen), 
        spawn(fun() -> par_connect(Listen )end), 
        loop(Socket). 
    loop(...).....%和之前一樣

4.注意點:

  • 1)建立某個套接字(通過呼叫gen_tcp:accept或gen_tcp:connect)的程序被稱為該套接字的控制程序。所有來自套接字的訊息都會被髮送到控制程序。如果控制程序掛了,套接字就會被關閉。某個套接字的控制程序可以通過呼叫gen_tcp:controlling_process(Socket, NewPid)修改成NewPid
  • 2)我們的並行伺服器可能會創建出幾千個連線,所以可以限制最大同時連線數。實現的方法可以是維護一個計數器來統計任一時刻有多少活動連線。每當收到一個新連線時就讓計數器加1,每當一個連線結束時就讓它減1。可以用它來限制系統裡的同時連線總數。
  • 3)接受一個連線後,顯式設定必要的套接字選項是一種很好的做法,就像這樣:
    {ok,Socket}=gen_tcp:accept(Listen), 
    inet:setopts(Socket,[{packet,4},binary,{nodelay,true},{active,true}]), 
    loop(Socket)

     

  • 4)Erlang 的 R11B-3 版開始允許多個 Erlang 程序對同一個監聽套接字呼叫 gen_tcp:accept/1。這讓編寫並行伺服器變得簡單了,因為你可以生成一個預先分裂好的程序池,讓它們都處在gen_tcp:accept/1的等待狀態。

5.主動和被動套接字

  • 1)三種開啟模式:主動(active),單次主動(active once) ,被動(passive) . 通過在gen_tcp:connect(Address, Port, Options)或gen_tcp:listen(Port, Options)的Options引數里加入{active, true | false | once}選項實現的。
  • 2)主動資訊接收(非阻塞式)
    {ok,Listen}=gen_tcp:listen(Port,[....,{active,true}...]),
    {ok,Socket}=gen_tcp:accept(Listen),
    loop(Socket).
        loop(Socket) ->
            receive
                {tcp,Socket,Data}->
                .......對資料進行操作....
                {tcp_closed,Socket}->
                .....
            end. %當客戶端生成資料的速度快於伺服器處理資料的速度,系統就會遭受資料洪流的衝擊
  • 3)被動資訊接收(阻塞式)
    {ok,Listen}=gen_tcp:listen(Port,[....,{active,false}...]),
    {ok,Socket}=gen_tcp:accept(Listen),
    loop(Socket).
    loop(Socket) ->
        case gen_tcp:recv(Socket,N) of
            {ok,B} ->
            .......對資料進行操作....
                loop(Socket);
            {error,closed}
                .....
        end. %每次想要接收資料就呼叫gen_tcp:recv,否則客戶端會一直阻塞.
  • 4)混合資訊接收(部分阻塞式)
    {ok,Listen}=gen_tcp:listen(Port,[....,{active,once}...]),
    {ok,Socket}=gen_tcp:accept(Listen),
    loop(Socket).
    loop(Socket) ->
        receive
            {tcp,Socket,Data} ->
                .......對資料進行操作....
                %當你準備好啟動下一個資訊的接收時
                inet:setopts(Sock,[{active,once}]),
                loop(Socket);
            {tcp_closed,Socket}->
                .....
        end. %當控制程序收到一個訊息後,必須顯式呼叫inet:setopts才能重啟下一個訊息的接收,否則系統處於阻塞狀態.

6.套接字錯誤處理

  • 當伺服器因為程式錯誤掛了,那麼伺服器支配的套接字就會被自動關閉,同時向客戶端傳送一個{tcp_closed,Socket}訊息

7.UDP

%應用示例:一個UDP階乘伺服器
-module(udp_test).
-export([start_server/0,client/1]).
start_server() ->
    spawn(fun() -> server(4000) end).
%伺服器
server(Port) ->
    {ok,Socket} =gen_udp:open(Port,[binary]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket).
loop(Socket) ->
    receive
        {udp,Socket,Host,Port,Bin}=Msg ->
            io:format("server received:~p~n",[Msg]),
            N=binary_to_term(Bin),
            Fac=fac(N),
            gen_udp:send(Socket,Host,Port,term_to_binary(Fac)),
            loop(Socket)
    end.
fac(0)->1;
fac(N) ->N* fac(N-1).
%客戶端
client(N) ->
    {ok,Socket}=gen_udp:open(0,[binary]),
    io:format("client opened socket=~p~n",[Socket]),
    ok=gen_udp:send(Socket,"localhost",4000,term_to_binary(N)),
    Value=receive
        {udp,Socket,_,_,Bin}=Msg ->
            io:format("client received:~p~n",[Msg]),
            binary_to_term(Bin)
        after 2000 ->
                0
        end,
    gen_udp:close(Socket),
    value.

8.UDP資料包可能會二次傳輸,所以可呼叫Erlang的內建函式make_ref,可以確保返回一個全域性唯一的引用.