rabbitmq中訊息的儲存
1. 大概原理:
所有佇列中的訊息都以append的方式寫到一個檔案中,當這個檔案的大小超過指定的限制大小後,關閉這個檔案再建立一個新的檔案供訊息的寫入。檔名(*.rdq)從0開始然後依次累加。當某個訊息被刪除時,並不立即從檔案中刪除相關資訊,而是做一些記錄,當垃圾資料達到一定比例時,啟動垃圾回收處理,將邏輯相鄰的檔案中的資料合併到一個檔案中。
2. 訊息的讀寫及刪除:
rabbitmq在啟動時會建立msg_store_persistent,msg_store_transient兩個程序,一個用於持久訊息的儲存,一個用於記憶體不夠時,將儲存在記憶體中的非持久化資料轉存到磁碟中。所有佇列的訊息的寫入和刪除最終都由這兩個程序負責處理,而訊息的讀取則可能是佇列本身直接開啟檔案進行讀取,也可能是傳送請求由msg_store_persisteng/msg_store_transient程序進行處理。
在進行訊息的儲存時,rabbitmq會在ets表中記錄訊息在檔案中的對映,以及檔案的相關資訊。訊息讀取時,根據訊息ID找到該訊息所儲存的檔案,在檔案中的偏移量,然後開啟檔案進行讀取。訊息的刪除只是從ets表刪除指定訊息的相關資訊,同時更新訊息對應儲存的檔案的相關資訊(更新檔案有效資料大小)。
-record(msg_location, { msg_id, %%訊息ID ref_count, %%引用計數 file, %%訊息儲存的檔名 offset, %%訊息在檔案中的偏移量 total_size %%訊息的大小 }). -record(file_summary, { file, %%檔名 valid_total_size, %%檔案有效資料大小 left, %%位於該檔案左邊的檔案 right, %%位於該檔案右邊的檔案 file_size, %%檔案總的大小 locked, %%上鎖標記 垃圾回收時防止對檔案進行操作 readers %%當前讀檔案的佇列數 })
3. 垃圾回收:
由於執行訊息刪除操作時,並不立即對在檔案中對訊息進行刪除,也就是說訊息依然在檔案中,僅僅是垃圾資料而已。當垃圾資料超過一定比例後(預設比例為50%),並且至少有三個及以上的檔案時,rabbitmq觸發垃圾回收。垃圾回收會先找到符合要求的兩個檔案(根據#file_summary{}中left,right找邏輯上相鄰的兩個檔案,並且兩個檔案的有效資料可在一個檔案中儲存),然後鎖定這兩個檔案,並先對左邊檔案的有效資料進行整理,再將右邊檔案的有效資料寫入到左邊檔案,同時更新訊息的相關資訊(儲存的檔案,檔案中的偏移量),檔案的相關資訊(檔案的有效資料,左邊檔案,右邊檔案),最後將右邊的檔案刪除。
4. 效能考慮:
(1)操作引用計數(flying_ets)
佇列在進行訊息的寫入和刪除操作前,會在flying_ets表裡通過+1,-1的方式進行計數,然後投遞請求給msg_store_persistent/msg_store_transient程序進行處理,程序在真正寫操作或者刪除之前會再次判斷flying_ets中對應訊息的計數決定是否需要進行相應操作。這樣,對於頻繁寫入和刪除的操作,概率減少實際的寫入和刪除。
client_write(MsgId, Msg, Flow,
CState=#client_msstate{cur_file_cache_ets=CurFileCacheEts,
client_ref=CRef}) ->
ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId, Flow}).
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
client_update_flying(Diff, MsgId,
#client_msstate{flying_ets = FlyingEts,
client_ref = CRef}) ->
Key = {MsgId, CRef},
case ets:insert_new(FlyingEts, {Key, Diff}) of
true ->
ok;
false ->
try ets:update_counter(FlyingEts, Key, {2, Diff}) of
...
end.
handle_cast({write, CRef, MsgId, Flow},
State = #msstate{cur_file_cache_ets=CurFileCacheEts,
clients=Clients}) ->
...
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
[{MsgId,Msg,_PWC}]=ets:lookup(CurFileCacheEts, MsgId),
noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
...
end;
handle_cast({remove, CRef, MsgIds}, State) ->
{RemovedMsgIds, State1} =
lists:foldl(
fun (MsgId, {Removed, State2}) ->
case update_flying(+1, MsgId, CRef, State2) of
process ->
{[MsgId | Removed],
remove_message(MsgId, CRef, State2)};
ignore ->
{Removed, State2}
end
end, {[], State}, MsgIds),
...
update_flying(Diff,MsgId,CRef,#msstate{flying_ets = FlyingEts }) ->
Key = {MsgId, CRef},
NDiff = -Diff,
case ets:lookup(FlyingEts, Key) of
[] ->
ignore;
[{_, Diff}] ->
ignore;
[{_, NDiff}] ->
ets:update_counter(FlyingEts, Key, {2, Diff}),
true = ets:delete_object(FlyingEts, {Key, 0}),
process;
[{_, 0}] ->
true = ets:delete_object(FlyingEts, {Key, 0}),
ignore;
[{_, Err}] ->
throw({bad_flying_ets_record, Diff, Err, Key})
end.
(2)儘可能的併發讀
在讀取訊息的時候,都先根據訊息ID找到對應儲存的檔案,如果檔案存在並且未被鎖住,則直接開啟檔案,從指定位置讀取訊息的內容。
如果訊息儲存的檔案被鎖住了,或者對應的檔案不存在了,則傳送請求,由msg_store_persistent/msg_store_transient程序進行處理。
(3)訊息快取
1)利用ets表進行快取
對於當前正在寫的檔案,所有訊息在寫入前都會在cur_file_cache_ets表中存一份,訊息讀取時會優先從這裡進行查詢。檔案關閉時,會將cur_file_cache_ets表中引用計數為0的訊息進行清除。
2)file_handle_cache的寫快取
rabbitmq中對檔案的操作封轉到了file_handle_cache模組,以寫模式開啟檔案時,預設有1M大小的快取,即在進行檔案的寫操作時,是先寫入到這個快取中,當快取超過大小或者顯式重新整理,才將快取中的內容刷入磁碟中。
rabbit_msg_store.erl
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName),
?BINARY_MODE ++ Mode,
[{write_buffer,?HANDLE_CACHE_BUFFER_SIZE}]).
file_handle_cache.erl
append(Ref,Data) ->
with_handles(
[Ref],
fun ([#handle { is_write = false }]) ->
{error, not_open_for_writing};
([Handle]) ->
case maybe_seek(eof, Handle) of
{{ok, _Offset}, #handle{hdl = Hdl,
offset = Offset,
write_buffer_size_limit = 0,
at_eof = true }= Handle1} ->
Offset1 = Offset + iolist_size(Data),
{prim_file:write(Hdl, Data),
[Handle1#handle{is_dirty=true,offset=Offset1 }]};
{{ok, _Offset},#handle{write_buffer = WriteBuffer,
write_buffer_size = Size,
write_buffer_size_limit= Limit,
at_eof = true } = Handle1} ->
WriteBuffer1 = [Data | WriteBuffer],
Size1 = Size + iolist_size(Data),
Handle2=Handle1#handle{write_buffer=WriteBuffer1,
write_buffer_size=Size1},
case Limit =/= infinity andalso Size1 > Limit of
true ->
{Result,Handle3} = write_buffer(Handle2),
{Result, [Handle3]};
false ->
{ok, [Handle2]}
end;
{{error, _} = Error, Handle1} ->
{Error, [Handle1]}
end
end).