1. 程式人生 > >Zookeeper C++程式設計實戰之配置更新

Zookeeper C++程式設計實戰之配置更新

CZookeeperHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/net/zookeeper_helper.h

CMainHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/sys/main_template.h

// 演示一個多執行緒程式如何藉助zookeeper,實現配置的動態更新
//
// 實現理念(有些場景不適合):
// 1) 讓執行緒不涉及配置的動態更新,這樣避免了動態更新配置
// 2) 通過建立新執行緒的方式達到配置動態更新的目的,老的執行緒直接退出
// 3) 先建立新執行緒,再退出老執行緒,保持服務不中斷
//
// 實際上,也可以通過父子程序方式來達到配置動態更新,
// 父程序檢測到配置更新後,父程序讀取配置,並檢查配置的合法性。
// 如果合法則建立新的子程序,完成後再kill原有的子程序,
// 這樣子程序就不涉及配置更新邏輯。
//
// 這兩種方法,均可比較簡單應對複雜的配置動態更新,
// 但如果新舊配置無法同時相容,則需要先停掉老的執行緒或程序,
// 然後再啟動新的執行緒或程序,否則做到無縫地動態更新。
//
// 編譯要求環境:C++11或更高
// 編譯語句大致如下:
// g++ -g -o b zk_conf_example.cpp -I/usr/local/mooon/include -I/usr/local/zookeeper/include /usr/local/mooon/lib/libmooon.a /usr/local/zookeeper/lib/libzookeeper_mt.a -pthread -std=c++11 -DMOOON_HAVE_ZOOKEEPER=1 -lz
#include <mooon/net/zookeeper_helper.h>
#include <mooon/sys/datetime_utils.h> // 格式化時間也可以考慮C++標準庫提供的std::put_time
#include <mooon/sys/main_template.h>
#include <mooon/utils/args_parser.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <system_error>
#include <thread>

// 指定存放配置的zookeeper
STRING_ARG_DEFINE(zookeeper, "", "Comma separated list of servers in the ZooKeeper Quorum, example: --zookeeper=127.0.0.1:2181");

class CMyApplication;

// 負責具體業務的工作者(執行緒)
class CWorker
{
public:
    CWorker(CMyApplication* app, int index);
    void run(); // 執行緒入口函式
    void stop() { _stop = true; }

private:
    CMyApplication* _app;
    int _index;
    volatile bool _stop;
};

// 應用程式主類(或叫上下文類,也可叫入口類)
// 通過繼承CZookeeperHelper,獲得zookeeper操作能力,
// 包括讀寫zookeeper資料能力、發現配置更新能力和主備切換能力。
//
// 可繼承mooon::sys::CMainHelper,
// 以獲得通過訊號SIGTERM的優雅退出能力,
// CMainHelper提供了優雅和安全的訊號處理,
// 預設的優雅退出訊號為SIGTERM,可自定義為其它訊號。
class CMyApplication: public mooon::net::CZookeeperHelper, public mooon::sys::CMainHelper
{
public:
    CMyApplication();

private:
    // num_workers 需要啟動的CWorker個數
    bool start_workers(
        std::vector<std::thread>* work_threads,
        std::vector<std::shared_ptr<CWorker>>* workers,
        int num_workers);
    void stop_workers(
        std::vector<std::thread>* work_threads,
        std::vector<std::shared_ptr<CWorker>>* workers);
    // 當zookeeper的會話過期後,
    // 需要呼叫recreate_zookeeper_session重新建立會話
    void recreate_zookeeper_session();

// 實現父類CMainHelper定義的虛擬函式(實為回撥函式),
// 以下五個“on_”函式,均執行在獨立的訊號執行緒中,而不是主執行緒中。
private:
    // 主執行緒的呼叫順序:
    //    main()
    // -> on_check_parameter() -> on_init()
    // -> on_run() -> on_fini()
    //
    // 注意on_terminated()是由訊號觸發的,
    // 由獨立的訊號執行緒呼叫,但位於on_init()之後。
    virtual bool on_check_parameter();
    virtual bool on_init(int argc, char* argv[]);
    virtual bool on_run(); // 這裡使得配置動態生效
    virtual void on_fini();
    virtual void on_terminated();

// 實現父類CZookeeperHelper定義的虛擬函式(實為回撥函式)
// 以下五個“on_”函式,均執行在獨立的zookeeper執行緒中,而不是主執行緒中。
private:
    virtual void on_zookeeper_session_connected(const char* path);
    virtual void on_zookeeper_session_connecting(const char* path);
    virtual void on_zookeeper_session_expired(const char *path);
    virtual void on_zookeeper_session_event(int state, const char *path);
    virtual void on_zookeeper_event(int type, int state, const char *path);

private:
    volatile bool _stop;
    std::mutex _mutex;
    std::condition_variable _cond;
    std::vector<std::thread> _work_threads;
    std::vector<std::shared_ptr<CWorker>> _workers;

private:
    volatile bool _conf_changed; // 配置發生變化
    volatile bool _zookeeper_session_expired; // zookeeper的會話(session)過期
    std::string _zk_nodes; // 存放配置的zookeeper節點列表
    std::string _conf_zkpath; // 配置的zookeeper節點路徑
};

int main(int argc, char* argv[])
{
    CMyApplication app;
    return mooon::sys::main_template(&app, argc, argv);
}

static unsigned long long get_current_thread_id()
{
    std::stringstream ss;
    ss << std::this_thread::get_id();
    return std::stoull(ss.str());
}

CMyApplication::CMyApplication()
    : _stop(false), _conf_changed(false), _zookeeper_session_expired(false)
{
    _conf_zkpath = "/tmp/conf";
}

bool CMyApplication::on_check_parameter()
{
    // 命令列引數“--zookeeper”不能為空
    return !mooon::argument::zookeeper->value().empty();
}

bool CMyApplication::on_init(int argc, char* argv[])
{
    try
    {
        // 以this方式呼叫的函式,均為CZookeeperHelper提供
        _zk_nodes = mooon::argument::zookeeper->value();
        this->create_session(_zk_nodes);

        // zookeeper的會話(session)是非同步建立的,
        // 只有連線成功後,方可讀取存放在zookeeper上的配置資料。
        for (int i=0; i<5&&!_stop; ++i)
        {
            if (this->is_connected())
                break;
            else
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }

        if (!this->is_connected())
        {
            fprintf(stderr, "Can not connect zookeeper://%s\n", _zk_nodes.c_str());
            return false;
        }
        else
        {
            // 取zookeeper節點資料
            std::string zkdata;
            int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
            if (n > 4 || zkdata.empty())
            {
                // 配置資料的大小超出預期
                fprintf(stderr, "conf size error: %d\n", n);
                return false;
            }
            else
            {
                // 如果zkdata不是一個有效的數字,
                // stoi會丟擲異常invalid_argument
                const int num_workers = std::stoi(zkdata);

                if (num_workers < 1 || num_workers > 10)
                {
                    fprintf(stderr, "conf error: %d\n", num_workers);
                    return false;
                }
                else
                {
                    return start_workers(&_work_threads, &_workers, num_workers);
                }
            }
        }
    }
    catch (std::invalid_argument& ex)
    {
        fprintf(stderr, "%s\n", ex.what());
        return false;
    }
    catch (mooon::sys::CSyscallException& ex)
    {
        fprintf(stderr, "%s\n", ex.str().c_str());
        return false;
    }
    catch (mooon::utils::CException& ex)
    {
        fprintf(stderr, "%s\n", ex.str().c_str());
        return false;
    }
}

bool CMyApplication::on_run()
{
    while (!_stop)
    {
        std::unique_lock<std::mutex> lock(_mutex);
        _cond.wait(lock); // 等待配置更新或收到退出指令
        if (_stop)
        {
            break;
        }

        // 以下實現省略了函式呼叫拋異常處理
        if (_zookeeper_session_expired)
        {
            // 如果會話過期,則需要重新建會話
            recreate_zookeeper_session();
        }
        if (_stop)
        {
            // 在建立會話過程中,可能收到了停止指令
            break;
        }
        if (_conf_changed)
        {
            _conf_changed = false;

            // 讀取新的配置
            std::string zkdata;
            int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4);
            if (n > 4)
            {
                // 這種情況下應觸發告警
                // 配置資料的大小超出預期
                fprintf(stderr, "conf size error: %d\n", n);
            }
            else
            {
                // 這裡可考慮加上優化:
                // 只有配置確實發生變化時才進行後續操作。
                const int num_workers = std::stoi(zkdata);

                if (num_workers < 1 || num_workers > 10)
                {
                    // 這種情況下應觸發告警
                    fprintf(stderr, "conf error: %d\n", num_workers);
                }
                else
                {
                    std::vector<std::thread> work_threads;
                    std::vector<std::shared_ptr<CWorker>> workers;

                    // 新的配置生效,才停掉原來的,
                    // 防止因為誤操破壞配置,導致整個系統崩潰
                    if (!start_workers(&work_threads, &workers, num_workers))
                    {
                        // 這種情況下應觸發告警
                    }
                    else
                    {
                        stop_workers(&_work_threads, &_workers);
                        _work_threads.swap(work_threads);
                        _workers.swap(workers);
                    }
                }
            }
        }
    }

    return true;
}

void CMyApplication::on_fini()
{
    // 應用退出時被呼叫
    fprintf(stdout, "Application is about to quit\n");
}

// 接收到了SIGTERM訊號
void CMyApplication::on_terminated()
{
    // 一定要最先呼叫父類CMainHelper的on_terminated
    mooon::sys::CMainHelper::on_terminated();

    _stop = true;
    stop_workers(&_work_threads, &_workers);

    std::unique_lock<std::mutex> lock(_mutex);
    _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run
}

bool CMyApplication::start_workers(
    std::vector<std::thread>* work_threads,
    std::vector<std::shared_ptr<CWorker>>* workers,
    int num_workers)
{
    try
    {
        for (int i=0; i<num_workers; ++i)
        {
            std::shared_ptr<CWorker> worker(new CWorker(this, i));
            workers->push_back(worker);
            work_threads->push_back(std::thread(&CWorker::run, worker));
        }
        return true;
    }
    catch(const std::system_error& ex)
    {
        // 如果有部分啟動功能應當回退,這裡省略了
        fprintf(stderr, "(%d)%s\n", ex.code().value(), ex.what());
        return false;
    }
}

void CMyApplication::stop_workers(
    std::vector<std::thread>* work_threads,
    std::vector<std::shared_ptr<CWorker>>* workers)
{
    for (std::vector<std::shared_ptr<CWorker>>::size_type i=0; i<workers->size(); ++i)
    {
        (*workers)[i]->stop();
        if ((*work_threads)[i].joinable())
            (*work_threads)[i].join();
    }
    work_threads->clear();
    workers->clear();
}

void CMyApplication::recreate_zookeeper_session()
{
    unsigned int count = 0;

    while (!_stop)
    {
        try
        {
            recreate_session();
            _zookeeper_session_expired = false;
            break;
        }
        catch (mooon::utils::CException& ex)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));

            if (0 == count++%30)
            {
                fprintf(stderr, "recreate zookeeper session failed: (count:%d)%s\n", count, ex.str().c_str());
            }
        }
    }
}

void CMyApplication::on_zookeeper_session_connected(const char* path)
{
    fprintf(stdout, "path=%s\n", path);
}

void CMyApplication::on_zookeeper_session_connecting(const char* path)
{
    fprintf(stdout, "path=%s\n", path);
}

void CMyApplication::on_zookeeper_session_expired(const char *path)
{
    fprintf(stdout, "path=%s\n", path);

    std::unique_lock<std::mutex> lock(_mutex);
    _zookeeper_session_expired = true;
    _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run
}

void CMyApplication::on_zookeeper_session_event(int state, const char *path)
{
    fprintf(stdout, "state=%d, path=%s\n", state, path);
}

void CMyApplication::on_zookeeper_event(int type, int state, const char *path)
{
    fprintf(stdout, "type=%d, state=%d, path=%s\n", type, state, path);

    if (ZOO_CONNECTED_STATE == state &&
        ZOO_CHANGED_EVENT == type &&
        0 == strcmp(path, _conf_zkpath.c_str()))
    {
        // 配置發生變化
        std::unique_lock<std::mutex> lock(_mutex);
        _conf_changed = true;
        _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run
    }
}

CWorker::CWorker(CMyApplication* app, int index)
    : _app(app), _index(index), _stop(false)
{
}

void CWorker::run()
{
    fprintf(stdout, "Worker[%d/%llu] \033[1;33mstarted\033[m\n", _index, get_current_thread_id());

    while (!_stop)
    {
        // 執行具體的業務邏輯操作,這裡僅以sleep替代做示範
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        fprintf(stdout, "[%s] Worker[\033[1;33m%d\033[m/%llu] is working ...\n",
            mooon::sys::CDatetimeUtils::get_current_time().c_str(),
            _index, get_current_thread_id());
    }

    fprintf(stdout, "Worker[%d/%llu] \033[1;33mstopped\033[m\n", _index, get_current_thread_id());
}