1. 程式人生 > >一個輕量快速的C++日誌庫

一個輕量快速的C++日誌庫

limlog

作一篇文章記錄實現,驅動優化迭代。 Github

很久之前就想寫一個日誌庫了,受限制於自己水平這個想法一直沒有完成。在上次寫一個 TCPServer 的時候,寫了一個錯誤的日誌庫,在多執行緒的情況對速度影響非常大,並且最重要的正確性也沒有保證。

近期又有了點時間,決定重新實現,對其 特點期望:

  • 正確性,這個是最重要也是最基本的,包括
    • 全部寫入.
    • 多個執行緒間的日誌不穿插干擾.
    • 日誌執行緒不能干擾主程式的執行邏輯.
  • 易讀性
    • 每條日誌記錄佔用一行空間,便於 awk 等工具的時候方便查詢.
    • 日誌資訊包含必要的資訊,包括日期時間、執行緒id、日誌等級、日誌發生的檔案和行數.
  • 易用性
    • 在類 printf 和 std::cout 的用法中選擇類 std::cout 用法.
    • 根據時間和日誌檔案大小自動滾動檔案.
    • 程式碼行數不超過 2000 行
  • 速度
    • 速度的重要性放在最後,執行緒增加(不超過CPU數量)的情況下,對速度的影響在30%內,每條日誌達到耗時在 1us 左右基本可以滿足要求.

日誌格式和日誌檔案格式期望如下:

  • 日誌檔案的格式為:檔名.日期.時間.日誌檔案滾動的索引.log
    test_log_file.20191224.160354.7.log
  • 日誌行的格式為 時間(精確到微秒)執行緒id 日誌等級 日誌資訊 - 檔名:呼叫函式:行
    20191224 16:04:05.125049 2970 ERROR 5266030 chello, world - LogTest.cpp:thr_1():20

之前看的兩個日誌庫,恰好都叫做 NanoLog:

  • Iyengar111/NanoLog, 跨平臺、C++11、實現簡單(不超過1000行),每個日誌行對應一個流物件、速度較快。
  • PlatformLab/NanoLog, 斯坦福一個實驗室的專案,Linux 平臺、C++17、日誌行非同步寫入,每個執行緒有一個區域性的迴圈位元組緩衝區,速度恐怖,是見過最快的日誌庫了,實現也非常細膩,優化的地方非常多,部分固定的日誌資訊直接編譯期確定下來。

前一個日誌庫學到了使用者介面巨集的定義方式,後一個 nanolog 則是吸收了其後端的設計部分,尤其是 thread_local 的使用,但我覺得它的精華在於前端部分,充斥著大量的模板。嗯,沒太看懂。

實現

前後端分離實現

  • 後端負責日誌資訊的落地處理,是整個日誌系統的重心。
  • 前端負責組織日誌資訊的格式。

後端實現

採用單例模式,全域性只有一個 LimLog 物件,在這個物件構造時也建立一個後臺執行緒,通過無限迴圈掃描是否有資料可以寫入至檔案中。通過條件變數控制該迴圈的終止及 LimLog 物件的銷燬。

使用 C++ 11 的關鍵字 thread_local 為每個執行緒都建立一個執行緒區域性緩衝區,這樣我們的前端的日誌寫入就可以不用加鎖(雖然不用加鎖,但是有另外的處理,後說明),每次都寫入到各自執行緒的緩衝區中。然後在上面的迴圈中掃描這些緩衝區,將其中的日誌資料讀取至內部的輸出緩衝區中,再將其寫入到檔案中。

每個緩衝區記憶體只建立一次使用,避免每次使用建立帶來不必要的申請消耗。每個執行緒區域性緩衝區的大小為 1M(1 << 20 bytes), LimLog 物件內的輸出緩衝區大小為 16M(1 << 24 bytes), 這個大小非常充足,實際測試 大概每個執行緒的記憶體使用為 30k(1M) 和 60-150k(16M) 左右,這個地方需要IO壓測一下。

後端日誌類的成員變數如下,


class LimLog{
    ···
  private:
    LogSink logSink_;
    bool threadSync_; // 前後端同步的標示
    bool threadExit_; // 後臺執行緒標示
    bool outputFull_; // 輸出緩衝區滿的標示
    LogLevel level_;
    uint32_t bufferSize_;        // 輸出緩衝區的大小
    uint32_t sinkCount_;         // 寫入檔案的次數
    uint32_t perConsumeBytes_;   // 每輪迴圈讀取的位元組數
    uint64_t totalConsumeBytes_; // 總讀取的位元組數
    char *outputBuffer_;         // first internal buffer.
    char *doubleBuffer_;         // second internal buffer, unused.
    std::vector<BlockingBuffer *> threadBuffers_;
    std::thread sinkThread_;
    std::mutex bufferMutex_; // internel buffer mutex.
    std::mutex condMutex_;
    std::condition_variable proceedCond_;  // 後臺執行緒是否繼續執行的標條件變數
    std::condition_variable hitEmptyCond_; // 前端緩衝區為空的條件變數
    static LimLog singletonLog;
    static thread_local BlockingBuffer *blockingBuffer_; // 執行緒區域性緩衝區
};

LimLog 物件的析構和後臺執行緒的退出需要十分的謹慎,這裡程式出問題的重災區,前一個日誌庫就是這個地方沒有處理好,導致有的時候日誌還沒有寫完程式就退出了。為了保證這個邏輯的正確執行,採用了兩個條件變數 proceedCond_hitEmptyCond_。在執行物件的解構函式時,需要等待無可讀資料的條件變數 hitEmptyCond_, 保證這個時候執行緒區域性緩衝區的資料都已經讀取完成,然後在解構函式中設定後臺執行緒迴圈退出的條件,並且使用 proceedCond_ 通知後臺執行緒執行。

以上邏輯在解構函式中的邏輯如下:

LimLog::~LimLog() {
    {
        // notify background thread befor the object detoryed.
        std::unique_lock<std::mutex> lock(singletonLog.condMutex_);
        singletonLog.threadSync_ = true;
        singletonLog.proceedCond_.notify_all();
        singletonLog.hitEmptyCond_.wait(lock);
    }

    {
        // stop sink thread.
        std::lock_guard<std::mutex> lock(condMutex_);
        singletonLog.threadExit_ = true;
        singletonLog.proceedCond_.notify_all();
    }
    ···
}

之後就轉入到後臺執行緒的迴圈處理中,以下是所有後臺執行緒處理的邏輯:

// Sink log info to file with async.
void LimLog::sinkThreadFunc() {
    // \fixed me, it will enter infinity if compile with -O3 .
    while (!threadExit_) {
        // move front-end data to internal buffer.
        {
            std::lock_guard<std::mutex> lock(bufferMutex_);
            uint32_t bbufferIdx = 0;
            // while (!threadExit_ && !outputFull_ && !threadBuffers_.empty()) {
            while (!threadExit_ && !outputFull_ &&
                   (bbufferIdx < threadBuffers_.size())) {
                BlockingBuffer *bbuffer = threadBuffers_[bbufferIdx];
                uint32_t consumableBytes = bbuffer->consumable(); // 如果這個地方使用 used() 代替,就會出現日誌行截斷的現象

                if (bufferSize_ - perConsumeBytes_ < consumableBytes) {
                    outputFull_ = true;
                    break;
                }

                if (consumableBytes > 0) {
                    uint32_t n = bbuffer->consume(
                        outputBuffer_ + perConsumeBytes_, consumableBytes);
                    perConsumeBytes_ += n;
                } else {
                    // threadBuffers_.erase(threadBuffers_.begin() +
                    // bbufferIdx);
                }
                bbufferIdx++;
            }
        }

        // not data to sink, go to sleep, 50us.
        if (perConsumeBytes_ == 0) {
            std::unique_lock<std::mutex> lock(condMutex_);

            // if front-end generated sync operation, consume again.
            if (threadSync_) {
                threadSync_ = false;
                continue;
            }

            hitEmptyCond_.notify_one();
            proceedCond_.wait_for(lock, std::chrono::microseconds(50));
        } else {
            logSink_.sink(outputBuffer_, perConsumeBytes_);
            sinkCount_++;
            totalConsumeBytes_ += perConsumeBytes_;
            perConsumeBytes_ = 0;
            outputFull_ = false;
        }
    }
}

當掃描發現無資料可取時,這個時候我們再迴圈讀取一下,防止解構函式在掃描資料和判斷 threadSync_ 執行的時間差之內又有日誌產生,在又一輪迴圈後,確保所有的資料都已經讀取,通知解構函式繼續執行,然後退出迴圈,該後臺執行緒函式退出,執行緒銷燬。 wait_for 是節約系統資源,出現無資料時當前執行緒就直接休眠 50us 避免不必要的迴圈消耗.

以上是對後臺執行緒的處理,對資料的處理其實還是比較簡單的,後端提供了一個寫入資料的介面,該介面將前端產生的資料統一寫入至執行緒區域性緩衝區中。

void LimLog::produce(const char *data, size_t n) {
    if (!blockingBuffer_) {
        std::lock_guard<std::mutex> lock(bufferMutex_);
        blockingBuffer_ =
            static_cast<BlockingBuffer *>(malloc(sizeof(BlockingBuffer)));
        threadBuffers_.push_back(blockingBuffer_);  // 新增到後端的緩衝區陣列中
    }

    blockingBuffer_->produce(data, n); // 將資料新增到執行緒區域性緩衝區中
}

後臺寫入執行緒這個時候,遍歷緩衝陣列,讀取資料至內部的輸出緩衝區中,當出現可讀取的資料大於緩衝區時,直接寫入檔案,剩餘的資料下一路再讀取;當無可讀取的資料時,判斷是物件析構了還只是簡單的沒有資料讀取,並且休眠 50us;有資料可讀則寫入至檔案,重置相關資料。

執行緒區域性緩衝區為一個阻塞環形生產者消費者佇列,設計為阻塞態的原因為緩衝區的大小足夠大了,如果改為非阻塞態,當緩衝區的滿了的時候重新分配記憶體,還要管理一次記憶體的釋放操作。雖然執行緒內的操作不需要使用鎖來同步,在後臺執行緒的迴圈範圍內,每次都要獲取資料的可讀大小,這裡可能就涉及多個執行緒對資料的訪問了。最初的版本測試在沒有同步並且開啟優化的情況下,有一定概率在 consume() 操作陷入死迴圈,原因是該執行緒的 cache 的變數未及時得到更新。現在使用記憶體屏障來解決這個問題,避免編譯器優化。

對於後臺執行緒讀取各執行緒內的資料有一個隨機性的問題,如果使用 BlockingBuffer::unsed()獲取已讀資料長度,可能會出現日誌行只被讀取了一半,然後立刻被另外的一個執行緒的日誌截斷的情況,這不是我們期望的。為了解決這個問題,我們又提供了一個介面 BlockingBuffer::incConsumablePos() 來移動可以讀取的位置,該介面每次遞增的長度為一條日誌行的長度,在一日誌行資料寫入到區域性緩衝區後呼叫該介面來移動可以讀取到的位置。而又提供介面 BlockingBuffer::consumable() 來獲取可以讀取的長度,這樣就避免了一條日誌行被截斷的情況。

前端實現

一條日誌資訊在記憶體中的佈局如下:
+------+-----------+-------+------+------+----------+------+
| time | thread id | level | logs | file | function | line |
+------+-----------+-------+------+------+----------+------+

20191224 16:04:05.125044 2970 ERROR  5266025chello, world - LogTest.cpp:thr_1():20
20191224 16:04:05.125044 2970 ERROR  5266026chello, world - LogTest.cpp:thr_1():20
20191224 16:04:05.125045 2970 ERROR  5266027chello, world - LogTest.cpp:thr_1():20

前端實現雖然簡單,但是優化的空間很大,使用 perf 做效能分析,效能熱點集中在引數格式化為字串和時間的處理上面。

我們用 LogLine 來表示一個日誌行,這個類非常簡單,過載了多個引數型別的操作符 <<, 及儲存一些日誌行相關資訊,包括 檔名稱,函式,行和最重要的寫入位元組數。
LogLine 不包含緩衝區,直接呼叫後端提供的介面 LimLog::produce() 將資料寫入到區域性緩衝區內。

limlog 中,多次使用到了 thread_local 關鍵字,在前端實現部分也是如此。

time 的處理

在 Linux 中使用 gettimeofday(2) 來獲取當前的時間戳,其他平臺使用 std::chrono 來獲取時間戳,用 localtime()strftime() 獲取本地的格式化時間 %Y%m%d %H:%M:%S, 合併微秒 timestamp % 1000000 組成 time.

這裡利用 thread_local 做一個優化,格式化時間的函式呼叫次數。定義一個執行緒區域性變數來儲存時間戳對應的秒數 t_second 和 字元陣列來儲存格式化時間 t_datetime[24],當秒數未發生改變時,直接取執行緒區域性字元陣列而不用再呼叫格式化函式。

在 x86-64 體系下,gettimeofday(2) 在 vdso 機制下已經不是系統呼叫了,經測試發現直接呼叫 gettimeofday(2)std::chrono 的高精度時鐘快 15% 左右,雖然不是系統呼叫但是耗時還是大頭,一個呼叫大概需要 600ns 左右的時間。

thread id 的獲取

在 Linux 下,使用 gettid() 而不是 pthread_self() 來獲取執行緒id,windows 暫時未支援,這個 std::this_thread::get_id() 取出裡面的整形id需要hack一下。

這裡繼續使用 thread_local 來對執行緒id優化,每個執行緒的id呼叫一次執行緒id獲取函式。建立一個執行緒區域性執行緒id變數,判斷執行緒id存在時,就不再呼叫 gettid() 了。

日誌行的其他項

日誌等級、檔案、函式和行數是簡單的字串寫入操作了。

用法

#define LOG(level)                                                             \
    if (limlog::getLogLevel() <= level)                                        \
    LogLine(level, __FILE__, __FUNCTION__, __LINE__)

#define LOG_TRACE LOG(limlog::LogLevel::TRACE)
#define LOG_DEBUG LOG(limlog::LogLevel::DEBUG)
#define LOG_INFO LOG(limlog::LogLevel::INFO)
#define LOG_WARN LOG(limlog::LogLevel::WARN)
#define LOG_ERROR LOG(limlog::LogLevel::ERROR)
#define LOG_FATAL LOG(limlog::LogLevel::FATAL)

如上所示,定義了一些巨集給呼叫者使用,每一條日誌都會建立一個 LogLine 物件,這個物件非常小,用完就銷燬,幾乎沒有開銷。用法同 std::cout

std::string str("std::string");
LOG_DEBUG << 'c' << 1 << 1.3 << "c string" << str;

TODO

  • 支援跨平臺
    • 支援執行緒id的獲取
  • 檔案的自動滾動
    • 根據時間(每天)自動滾動檔案
  • 速度
    • LogLine::operator<<() 引數轉字串做優化
    • 日誌行的時間戳獲取優化,這裡的時間佔去60%左右的開銷
  • 測試
    • 速度
    • 正確性

參考

  1. Iyengar111/NanoLog, Low Latency C++11 Logging Library.
  2. PlatformLab/NanoLog, Nanolog is an extremely performant nanosecond scale logging system for C++ that exposes a simple printf-like API.
  3. kfifo, 環形生產者消費者佇列.
  4. memory barries, 核心的記憶體屏障.