1. 程式人生 > >rabbitmq中訊息的儲存

rabbitmq中訊息的儲存

1. 大概原理:

所有佇列中的訊息都以append的方式寫到一個檔案中,當這個檔案的大小超過指定的限制大小後,關閉這個檔案再建立一個新的檔案供訊息的寫入。檔名(*.rdq)從0開始然後依次累加。當某個訊息被刪除時,並不立即從檔案中刪除相關資訊,而是做一些記錄,當垃圾資料達到一定比例時,啟動垃圾回收處理,將邏輯相鄰的檔案中的資料合併到一個檔案中。

2. 訊息的讀寫及刪除:

rabbitmq在啟動時會建立msg_store_persistent,msg_store_transient兩個程序,一個用於持久訊息的儲存,一個用於記憶體不夠時,將儲存在記憶體中的非持久化資料轉存到磁碟中。所有佇列的訊息的寫入和刪除最終都由這兩個程序負責處理,而訊息的讀取則可能是佇列本身直接開啟檔案進行讀取,也可能是傳送請求由msg_store_persisteng/msg_store_transient程序進行處理。

在進行訊息的儲存時,rabbitmq會在ets表中記錄訊息在檔案中的對映,以及檔案的相關資訊。訊息讀取時,根據訊息ID找到該訊息所儲存的檔案,在檔案中的偏移量,然後開啟檔案進行讀取。訊息的刪除只是從ets表刪除指定訊息的相關資訊,同時更新訊息對應儲存的檔案的相關資訊(更新檔案有效資料大小)。

-record(msg_location, { msg_id,     %%訊息ID
                        ref_count,  %%引用計數
                        file,       %%訊息儲存的檔名
                        offset,     %%訊息在檔案中的偏移量
                        total_size  %%訊息的大小
                      }).

-record(file_summary, { file,       %%檔名
                        valid_total_size, %%檔案有效資料大小
                        left,       %%位於該檔案左邊的檔案
                        right,      %%位於該檔案右邊的檔案
                        file_size,  %%檔案總的大小
                        locked,     %%上鎖標記 垃圾回收時防止對檔案進行操作
                        readers     %%當前讀檔案的佇列數
                      })

3. 垃圾回收:

由於執行訊息刪除操作時,並不立即對在檔案中對訊息進行刪除,也就是說訊息依然在檔案中,僅僅是垃圾資料而已。當垃圾資料超過一定比例後(預設比例為50%),並且至少有三個及以上的檔案時,rabbitmq觸發垃圾回收。垃圾回收會先找到符合要求的兩個檔案(根據#file_summary{}中left,right找邏輯上相鄰的兩個檔案,並且兩個檔案的有效資料可在一個檔案中儲存),然後鎖定這兩個檔案,並先對左邊檔案的有效資料進行整理,再將右邊檔案的有效資料寫入到左邊檔案,同時更新訊息的相關資訊(儲存的檔案,檔案中的偏移量),檔案的相關資訊(檔案的有效資料,左邊檔案,右邊檔案),最後將右邊的檔案刪除。


4. 效能考慮:

(1)操作引用計數(flying_ets)

佇列在進行訊息的寫入和刪除操作前,會在flying_ets表裡通過+1,-1的方式進行計數,然後投遞請求給msg_store_persistent/msg_store_transient程序進行處理,程序在真正寫操作或者刪除之前會再次判斷flying_ets中對應訊息的計數決定是否需要進行相應操作。這樣,對於頻繁寫入和刪除的操作,概率減少實際的寫入和刪除。

client_write(MsgId, Msg, Flow,
             CState=#client_msstate{cur_file_cache_ets=CurFileCacheEts,
                                    client_ref=CRef}) ->
    ok = client_update_flying(+1, MsgId, CState),
    ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
    ok = server_cast(CState, {write, CRef, MsgId, Flow}).

remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
    [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
    server_cast(CState, {remove, CRef, MsgIds}).

client_update_flying(Diff, MsgId,
                     #client_msstate{flying_ets = FlyingEts,
                                     client_ref = CRef}) ->
    Key = {MsgId, CRef},
    case ets:insert_new(FlyingEts, {Key, Diff}) of
        true  ->
            ok;
        false ->
            try ets:update_counter(FlyingEts, Key, {2, Diff}) of
            ...
    end.

handle_cast({write, CRef, MsgId, Flow},
            State = #msstate{cur_file_cache_ets=CurFileCacheEts,
                             clients=Clients}) ->
    ...
    true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
    case update_flying(-1, MsgId, CRef, State) of
        process ->
            [{MsgId,Msg,_PWC}]=ets:lookup(CurFileCacheEts, MsgId),
            noreply(write_message(MsgId, Msg, CRef, State));
        ignore ->
            ...
    end;

handle_cast({remove, CRef, MsgIds}, State) ->
    {RemovedMsgIds, State1} =
        lists:foldl(
            fun (MsgId, {Removed, State2}) ->
                case update_flying(+1, MsgId, CRef, State2) of
                    process ->
                        {[MsgId | Removed],
                        remove_message(MsgId, CRef, State2)};
                    ignore ->
                        {Removed, State2}
                end
            end, {[], State}, MsgIds),
    ...

update_flying(Diff,MsgId,CRef,#msstate{flying_ets = FlyingEts }) ->
    Key = {MsgId, CRef},
    NDiff = -Diff,
    case ets:lookup(FlyingEts, Key) of
        [] ->
            ignore;
        [{_,  Diff}] ->
            ignore;
        [{_, NDiff}] ->
            ets:update_counter(FlyingEts, Key, {2, Diff}),
            true = ets:delete_object(FlyingEts, {Key, 0}),
            process;
        [{_, 0}] ->
            true = ets:delete_object(FlyingEts, {Key, 0}),
            ignore;
        [{_, Err}] ->
            throw({bad_flying_ets_record, Diff, Err, Key})
    end.

(2)儘可能的併發讀

在讀取訊息的時候,都先根據訊息ID找到對應儲存的檔案,如果檔案存在並且未被鎖住,則直接開啟檔案,從指定位置讀取訊息的內容。

如果訊息儲存的檔案被鎖住了,或者對應的檔案不存在了,則傳送請求,由msg_store_persistent/msg_store_transient程序進行處理。

(3)訊息快取

1)利用ets表進行快取 

對於當前正在寫的檔案,所有訊息在寫入前都會在cur_file_cache_ets表中存一份,訊息讀取時會優先從這裡進行查詢。檔案關閉時,會將cur_file_cache_ets表中引用計數為0的訊息進行清除。

2)file_handle_cache的寫快取

rabbitmq中對檔案的操作封轉到了file_handle_cache模組,以寫模式開啟檔案時,預設有1M大小的快取,即在進行檔案的寫操作時,是先寫入到這個快取中,當快取超過大小或者顯式重新整理,才將快取中的內容刷入磁碟中。

rabbit_msg_store.erl

-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB

open_file(Dir, FileName, Mode) ->
    file_handle_cache:open(form_filename(Dir, FileName),
                           ?BINARY_MODE ++ Mode,
                           [{write_buffer,?HANDLE_CACHE_BUFFER_SIZE}]).

file_handle_cache.erl

append(Ref,Data) ->
    with_handles(
        [Ref],
        fun ([#handle { is_write = false }]) ->
           {error, not_open_for_writing};
        ([Handle]) ->
            case maybe_seek(eof, Handle) of
                {{ok, _Offset}, #handle{hdl = Hdl,
                                        offset = Offset,
                                        write_buffer_size_limit = 0,
                                        at_eof = true }= Handle1} ->
                    Offset1 = Offset + iolist_size(Data),
                    {prim_file:write(Hdl, Data),
                    [Handle1#handle{is_dirty=true,offset=Offset1 }]};
                {{ok, _Offset},#handle{write_buffer = WriteBuffer,
                                       write_buffer_size = Size,
                                       write_buffer_size_limit= Limit,
                                       at_eof = true } = Handle1} ->
                    WriteBuffer1 = [Data | WriteBuffer],
                    Size1 = Size + iolist_size(Data),
                    Handle2=Handle1#handle{write_buffer=WriteBuffer1,
                                           write_buffer_size=Size1},
                    case Limit =/= infinity andalso Size1 > Limit of
                        true  ->
                            {Result,Handle3} = write_buffer(Handle2),
                            {Result, [Handle3]};
                        false ->
                            {ok, [Handle2]}
                    end;
                {{error, _} = Error, Handle1} ->
                    {Error, [Handle1]}
            end
        end).