第二十章 多核程式設計
阿新 • • 發佈:2020-08-24
20.1 如何在多核的CPU上更有效率的執行
20.1.1 使用大量程序
這個標準…顯而易見。
20.1.2 避免副作用
因為存在副作用, 導致使用共享記憶體方式時必須使用鎖機制, 雖然Erlang沒有共享記憶體, 但對於可以被多個程序共享的ETS表和DETS表還是應該特別注意。
20.1.3 順序瓶頸
對於本質就是順序性的問題, 顯然無法做到併發化。
而磁碟IO, 也是一個無法避免的自然瓶頸。
註冊程序, 人為的建立了一個潛在的順序瓶頸。
20.2 並行化順序程式碼
並行化的map
pmap(F, L) -> S = self(), Ref = erlang:make_ref(), %% 對於列表中的每個引數都啟動一個程序去處理 Pids = map(fun(I) ->spawn(fun() ->do_f(S, Ref, F, I) end) end, L), gather(Pids, Ref). %% 處理完成後向父程序傳送結果 do_f(Parent, Ref, F, I) -> Parent ! {self(), Ref, (catch F(I))}. %% 以正確的順序拼接每個程序的執行結果 gather([Pid|T], Ref) -> receive {Pid, Ref, Ret} ->[Ret|gather(T, Ref)] end; gather([], _) ->[].
什麼時候可以用pmap:1. 計算量很小的函式; 2. 不建立太多的程序; 3. 在恰當的抽象層次上思考
20.3 小訊息, 大計算
啟動SMP Erlang
# -smp 啟動SMP Erlang # +S N 使用N個Erlang虛擬機器 $ erl -smp +S N 測試不同的虛擬機器數量對效能的影響 #!/bin/sh echo "" >results for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 do echo $i erl -boot start_clean -noshell -smp +S $i -s ptests tests $i >> results done
20.4 map-reduce演算法和磁碟索引程式
20.4.1 map-reduce演算法
%% map函式 MapReduce每次給列表中的每個X建立一個新的程序
F1 = fun(Pid, X) ->void,
%% reduce函式 針對每個鍵值, 將它所對應的所有值合併到一起
%% Acc0 累加器
F2 = fun(key, [Value], Acc0) ->Acc
L = [X]
Acc = X = term()
%% 呼叫形式
mapreduce(F1, F2, Acc0, L) ->Acc
具體的實現
mapreduce(F1, F2, Acc0, L) -> S = self(), %% 啟動新的程序執行reduce函式 Pid = spawn(fun() ->reduce(S, F1, F2, Acc0, L) end), receive {Pid, Result} -> Result end. reduce(Parent, F1, F2, Acc0, L) -> process_flag(trap_exit, true), ReducePid = self(), %% map過程的實現 %% 對於列表中的每個值都啟動一個程序在do_job中呼叫F1進行處理 foreach(fun(X) -> spawn_link(fun() ->do_job(ReducePid, F1, X) end) end, L), N = length(L), %% 用字典儲存鍵值 Dict0 = dict:new(), %% 等待map過程完成 Dict1 = collect_replies(N, Dict0), %% 呼叫F2按相同鍵值進行合併 Acc = dict:fold(F2, Acc0, Dict1), %% 向MapReduce程序通知執行結果 Parent ! {self(), Acc}. %% 按鍵值進行合併的過程 collect_replies(0, Dict) -> Dict; collect_replies(N, Dict) -> receive %% 對鍵-值的處理 %% 存在Key則將Val相加, 否則插入到字典 {Key, Val} -> case dict:is_key(Key, Dict) of true -> Dict1 = dict:append(Key, Val, Dict), collect_replies(N, Dict1); false -> Dict1 = dict:store(Key,[Val], Dict), collect_replies(N, Dict1) end; {'EXIT', _, _Why} -> collect_replies(N-1, Dict) end. %% 執行指定的map函式 do_job(ReducePid, F, X) -> F(ReducePid, X).
測試程式碼:
-module(test_mapreduce).
-compile(export_all).
-import(lists, [reverse/1, sort/1]).
test() ->
wc_dir(".").
wc_dir(Dir) ->
%% map函式
F1 = fun generate_words/2,
%% reduce函式
F2 = fun count_words/3,
%% 引數列表
Files = lib_find:files(Dir, ".*[.](erl)", false),
%% 呼叫mapreduce處理
L1 = phofs:mapreduce(F1, F2, [], Files),
reverse(sort(L1)).
%% 查詢檔案中的每個單詞
generate_words(Pid, File) ->
F = fun(Word) ->Pid ! {Word, 1} end,
lib_misc:foreachWordInFile(File, F).
%% 統計有多少個不同的單詞
count_words(Key, Vals, A) ->
[{length(Vals), Key}|A].
執行結果:
1> test_mapreduce:test().
[{115,"L"},
{84,"T"},
{80,"1"},
{77,"end"},
{72,"X"},
{52,"H"},
{47,"file"},
{46,"S"},
{44,"of"},
{43,"F"},
{40,"2"},
{39,"Key"},
{39,"Fun"},
{37,"is"},
{35,"case"},
{34,"fun"},
{34,"Pid"},
{34,"N"},
{33,"File"},
{32,"true"},
{31,"Str"},
{28,"ok"},
{27,"prefix"},
{27,"Val"},
{27,"I"},
{26,"to"},
{26,[...]},
{24,...},
{...}|...]
20.4.2 全文檢索
-
- 反向索引
檔案-內容對照表
- 反向索引
檔名 | 內容 |
---|---|
/home/dogs | rover jack buster winston |
/home/animals/cats | zorro daisy jaguar |
/home/cars | rover jaguar ford |
索引-檔案對照表
索引 | 檔名 |
---|---|
1 | /home/dogs |
2 | /home/animals/cats |
3 | /home/cars |
單詞-索引對照表
單詞 | 索引 |
---|---|
rover | 1,3 |
jack | 1 |
buster | 1 |
winston | 1 |
zorro | 2 |
daisy | 2 |
jaguar | 2,3 |
ford | 3 |
-
- 反向索引的查詢
通過單詞-索引, 索引-檔案的對照表查詢單詞與檔案的對應關係
- 反向索引的查詢
-
- 反向索引的資料結構
因為一個常見的詞可能在成千上萬的檔案中出現, 因此使用數字索引代替檔名可大大節省儲存空間, 因此需要檔案與索引的對照表。
對於每個在檔案中出現的單詞, 都需要記錄此檔案的索引號, 因此建立單詞與索引的對照表。
- 反向索引的資料結構
20.4.3 索引器的操作
%% 啟動一個名為indexer_server的伺服器程序
%% 啟動一個worker程序來執行索引動作
start() ->
indexer_server:start(output_dir()),
spawn_link(fun() ->worker() end).
worker() ->
possibly_stop(),
%% 返回下一個需要索引的目錄
case indexer_server:next_dir() of
{ok, Dir} ->
%% 查詢目錄下需要進行索引的檔案
Files = indexer_misc:files_in_dir(Dir),
%% 為其建立索引
index_these_files(Files),
%% 檢測是否正常完成
indexer_server:checkpoint(),
possibly_stop(),
sleep(10000),
worker();
done ->
true
end.
%% 使用MapReduce演算法實現建立索引的並行處理
index_these_files(Files) ->
Ets = indexer_server:ets_table(),
OutDir = filename:join(indexer_server:outdir(), "index"),
%% map函式
F1 = fun(Pid, File) ->indexer_words:words_in_file(Pid, File, Ets) end,
%% reduce函式
F2 = fun(Key, Val, Acc) ->handle_result(Key, Val, OutDir, Acc) end,
indexer_misc:mapreduce(F1, F2, 0, Files).
%% 按照Key值進行合併
handle_result(Key, Vals, OutDir, Acc) ->
add_to_file(OutDir, Key, Vals),
Acc + 1.
%% 將索引陣列新增到Word中
add_to_file(OutDir, Word, Is) ->
L1 = map(fun(I) -><<I:32>> end, Is),
OutFile = filename:join(OutDir, Word),
case file:open(OutFile, [write,binary,raw,append]) of
{ok, S} ->
file:pwrite(S, 0, L1),
file:close(S);
{error, E} ->
exit({ebadFileOp, OutFile, E})
end.
20.4.4 執行索引器
1> indexer:cold_start().
2> indexer:start().
3> indexer:stop().
20.4.5 評論
可以改進的三個方面
1. 改進單詞抽取
2. 改進map-reduce演算法, 以便處理海量資料
3. 方向索引的資料結構只使用了檔案系統來儲存