1. 程式人生 > 實用技巧 >第二十章 多核程式設計

第二十章 多核程式設計

20.1 如何在多核的CPU上更有效率的執行

20.1.1 使用大量程序

這個標準…顯而易見。

20.1.2 避免副作用

因為存在副作用, 導致使用共享記憶體方式時必須使用鎖機制, 雖然Erlang沒有共享記憶體, 但對於可以被多個程序共享的ETS表和DETS表還是應該特別注意。

20.1.3 順序瓶頸

對於本質就是順序性的問題, 顯然無法做到併發化。
而磁碟IO, 也是一個無法避免的自然瓶頸。
註冊程序, 人為的建立了一個潛在的順序瓶頸。

20.2 並行化順序程式碼

並行化的map

pmap(F, L) ->
    S = self(),
    Ref = erlang:make_ref(),
    %% 對於列表中的每個引數都啟動一個程序去處理 
    Pids = map(fun(I) ->spawn(fun() ->do_f(S, Ref, F, I) end) end, L),
    gather(Pids, Ref).

%% 處理完成後向父程序傳送結果
do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

%% 以正確的順序拼接每個程序的執行結果
gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} ->[Ret|gather(T, Ref)]
    end;
gather([], _) ->[].

什麼時候可以用pmap:1. 計算量很小的函式; 2. 不建立太多的程序; 3. 在恰當的抽象層次上思考

20.3 小訊息, 大計算

啟動SMP Erlang

# -smp  啟動SMP Erlang
# +S N  使用N個Erlang虛擬機器
$ erl -smp +S N
  測試不同的虛擬機器數量對效能的影響
#!/bin/sh
echo "" >results
for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 
do
   echo $i
   erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results
done

20.4 map-reduce演算法和磁碟索引程式

20.4.1 map-reduce演算法

%% map函式     MapReduce每次給列表中的每個X建立一個新的程序 
F1 = fun(Pid, X) ->void,

%% reduce函式  針對每個鍵值, 將它所對應的所有值合併到一起
%% Acc0       累加器
F2 = fun(key, [Value], Acc0) ->Acc
L = [X]
Acc = X = term()

%% 呼叫形式
mapreduce(F1, F2, Acc0, L) ->Acc

具體的實現

mapreduce(F1, F2, Acc0, L) ->
    S = self(),
    %% 啟動新的程序執行reduce函式 
    Pid = spawn(fun() ->reduce(S, F1, F2, Acc0, L) end),
    receive
        {Pid, Result} ->
            Result
    end.


reduce(Parent, F1, F2, Acc0, L) ->
    process_flag(trap_exit, true),
    ReducePid = self(),

    %% map過程的實現
    %% 對於列表中的每個值都啟動一個程序在do_job中呼叫F1進行處理 
    foreach(fun(X) ->
            spawn_link(fun() ->do_job(ReducePid, F1, X) end)
        end, L),
    N = length(L),
    %% 用字典儲存鍵值
    Dict0 = dict:new(),
    %% 等待map過程完成
    Dict1 = collect_replies(N, Dict0),
    %% 呼叫F2按相同鍵值進行合併 
    Acc = dict:fold(F2, Acc0, Dict1),
    %% 向MapReduce程序通知執行結果
    Parent ! {self(), Acc}.

%% 按鍵值進行合併的過程
collect_replies(0, Dict) ->
    Dict;
collect_replies(N, Dict) ->
    receive
        %% 對鍵-值的處理
        %% 存在Key則將Val相加, 否則插入到字典
        {Key, Val} ->
            case dict:is_key(Key, Dict) of
                true ->
                    Dict1 = dict:append(Key, Val, Dict),
                    collect_replies(N, Dict1);
                false ->
                    Dict1 = dict:store(Key,[Val], Dict),
                    collect_replies(N, Dict1)
            end;
        {'EXIT', _,  _Why} ->
            collect_replies(N-1, Dict)
    end.

%% 執行指定的map函式
do_job(ReducePid, F, X) ->
    F(ReducePid, X).

測試程式碼:

-module(test_mapreduce).
-compile(export_all).
-import(lists, [reverse/1, sort/1]).

test() ->
    wc_dir(".").

wc_dir(Dir) ->
    %% map函式
    F1 = fun generate_words/2,
    %% reduce函式
    F2 = fun count_words/3,
    %% 引數列表
    Files = lib_find:files(Dir, ".*[.](erl)", false),
    %% 呼叫mapreduce處理
    L1 = phofs:mapreduce(F1, F2, [], Files),
    reverse(sort(L1)).

%% 查詢檔案中的每個單詞
generate_words(Pid, File) ->
    F = fun(Word) ->Pid ! {Word, 1} end,
    lib_misc:foreachWordInFile(File, F).

%% 統計有多少個不同的單詞
count_words(Key, Vals, A) ->
    [{length(Vals), Key}|A].

執行結果:

1> test_mapreduce:test().
[{115,"L"},
 {84,"T"},
 {80,"1"},
 {77,"end"},
 {72,"X"},
 {52,"H"},
 {47,"file"},
 {46,"S"},
 {44,"of"},
 {43,"F"},
 {40,"2"},
 {39,"Key"},
 {39,"Fun"},
 {37,"is"},
 {35,"case"},
 {34,"fun"},
 {34,"Pid"},
 {34,"N"},
 {33,"File"},
 {32,"true"},
 {31,"Str"},
 {28,"ok"},
 {27,"prefix"},
 {27,"Val"},
 {27,"I"},
 {26,"to"},
 {26,[...]},
 {24,...},
 {...}|...]

20.4.2 全文檢索

    1. 反向索引
      檔案-內容對照表
檔名 內容
/home/dogs rover jack buster winston
/home/animals/cats zorro daisy jaguar
/home/cars rover jaguar ford

索引-檔案對照表

索引 檔名
1 /home/dogs
2 /home/animals/cats
3 /home/cars

單詞-索引對照表

單詞 索引
rover 1,3
jack 1
buster 1
winston 1
zorro 2
daisy 2
jaguar 2,3
ford 3
    1. 反向索引的查詢
      通過單詞-索引, 索引-檔案的對照表查詢單詞與檔案的對應關係
    1. 反向索引的資料結構
      因為一個常見的詞可能在成千上萬的檔案中出現, 因此使用數字索引代替檔名可大大節省儲存空間, 因此需要檔案與索引的對照表。
      對於每個在檔案中出現的單詞, 都需要記錄此檔案的索引號, 因此建立單詞與索引的對照表。

20.4.3 索引器的操作

%% 啟動一個名為indexer_server的伺服器程序
%% 啟動一個worker程序來執行索引動作 
start() ->
    indexer_server:start(output_dir()),
    spawn_link(fun() ->worker() end).

worker() ->
    possibly_stop(),
    %% 返回下一個需要索引的目錄
    case indexer_server:next_dir() of
    {ok, Dir} ->
        %% 查詢目錄下需要進行索引的檔案 
        Files = indexer_misc:files_in_dir(Dir),
        %% 為其建立索引
        index_these_files(Files),
        %% 檢測是否正常完成
        indexer_server:checkpoint(),
        possibly_stop(),
        sleep(10000),
        worker();
    done ->
        true
    end.

%% 使用MapReduce演算法實現建立索引的並行處理 
index_these_files(Files) ->
    Ets = indexer_server:ets_table(),
    OutDir = filename:join(indexer_server:outdir(), "index"),
    %% map函式
    F1 = fun(Pid, File) ->indexer_words:words_in_file(Pid, File, Ets) end,
    %% reduce函式
    F2 = fun(Key, Val, Acc) ->handle_result(Key, Val, OutDir, Acc) end,
    indexer_misc:mapreduce(F1, F2, 0, Files).

%% 按照Key值進行合併
handle_result(Key, Vals, OutDir, Acc) ->
    add_to_file(OutDir, Key, Vals),
    Acc + 1.

%% 將索引陣列新增到Word中
add_to_file(OutDir, Word, Is) ->
    L1 = map(fun(I) -><<I:32>> end, Is),
    OutFile = filename:join(OutDir, Word),
    case file:open(OutFile, [write,binary,raw,append]) of
    {ok, S} ->
        file:pwrite(S, 0, L1),
        file:close(S);
    {error, E} ->
          exit({ebadFileOp, OutFile, E})
    end.

20.4.4 執行索引器

1> indexer:cold_start().
2> indexer:start().
3> indexer:stop().

20.4.5 評論

可以改進的三個方面

1. 改進單詞抽取
2. 改進map-reduce演算法, 以便處理海量資料
3. 方向索引的資料結構只使用了檔案系統來儲存

20.4.6 索引器的程式碼