Zookeeper C++程式設計實戰之配置更新
阿新 • • 發佈:2018-12-07
CZookeeperHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/net/zookeeper_helper.h
CMainHelper:
https://github.com/eyjian/libmooon/blob/master/include/mooon/sys/main_template.h
// 演示一個多執行緒程式如何藉助zookeeper,實現配置的動態更新 // // 實現理念(有些場景不適合): // 1) 讓執行緒不涉及配置的動態更新,這樣避免了動態更新配置 // 2) 通過建立新執行緒的方式達到配置動態更新的目的,老的執行緒直接退出 // 3) 先建立新執行緒,再退出老執行緒,保持服務不中斷 // // 實際上,也可以通過父子程序方式來達到配置動態更新, // 父程序檢測到配置更新後,父程序讀取配置,並檢查配置的合法性。 // 如果合法則建立新的子程序,完成後再kill原有的子程序, // 這樣子程序就不涉及配置更新邏輯。 // // 這兩種方法,均可比較簡單應對複雜的配置動態更新, // 但如果新舊配置無法同時相容,則需要先停掉老的執行緒或程序, // 然後再啟動新的執行緒或程序,否則做到無縫地動態更新。 // // 編譯要求環境:C++11或更高 // 編譯語句大致如下: // g++ -g -o b zk_conf_example.cpp -I/usr/local/mooon/include -I/usr/local/zookeeper/include /usr/local/mooon/lib/libmooon.a /usr/local/zookeeper/lib/libzookeeper_mt.a -pthread -std=c++11 -DMOOON_HAVE_ZOOKEEPER=1 -lz #include <mooon/net/zookeeper_helper.h> #include <mooon/sys/datetime_utils.h> // 格式化時間也可以考慮C++標準庫提供的std::put_time #include <mooon/sys/main_template.h> #include <mooon/utils/args_parser.h> #include <chrono> #include <condition_variable> #include <mutex> #include <system_error> #include <thread> // 指定存放配置的zookeeper STRING_ARG_DEFINE(zookeeper, "", "Comma separated list of servers in the ZooKeeper Quorum, example: --zookeeper=127.0.0.1:2181"); class CMyApplication; // 負責具體業務的工作者(執行緒) class CWorker { public: CWorker(CMyApplication* app, int index); void run(); // 執行緒入口函式 void stop() { _stop = true; } private: CMyApplication* _app; int _index; volatile bool _stop; }; // 應用程式主類(或叫上下文類,也可叫入口類) // 通過繼承CZookeeperHelper,獲得zookeeper操作能力, // 包括讀寫zookeeper資料能力、發現配置更新能力和主備切換能力。 // // 可繼承mooon::sys::CMainHelper, // 以獲得通過訊號SIGTERM的優雅退出能力, // CMainHelper提供了優雅和安全的訊號處理, // 預設的優雅退出訊號為SIGTERM,可自定義為其它訊號。 class CMyApplication: public mooon::net::CZookeeperHelper, public mooon::sys::CMainHelper { public: CMyApplication(); private: // num_workers 需要啟動的CWorker個數 bool start_workers( std::vector<std::thread>* work_threads, std::vector<std::shared_ptr<CWorker>>* workers, int num_workers); void stop_workers( std::vector<std::thread>* work_threads, std::vector<std::shared_ptr<CWorker>>* workers); // 當zookeeper的會話過期後, // 需要呼叫recreate_zookeeper_session重新建立會話 void recreate_zookeeper_session(); // 實現父類CMainHelper定義的虛擬函式(實為回撥函式), // 以下五個“on_”函式,均執行在獨立的訊號執行緒中,而不是主執行緒中。 private: // 主執行緒的呼叫順序: // main() // -> on_check_parameter() -> on_init() // -> on_run() -> on_fini() // // 注意on_terminated()是由訊號觸發的, // 由獨立的訊號執行緒呼叫,但位於on_init()之後。 virtual bool on_check_parameter(); virtual bool on_init(int argc, char* argv[]); virtual bool on_run(); // 這裡使得配置動態生效 virtual void on_fini(); virtual void on_terminated(); // 實現父類CZookeeperHelper定義的虛擬函式(實為回撥函式) // 以下五個“on_”函式,均執行在獨立的zookeeper執行緒中,而不是主執行緒中。 private: virtual void on_zookeeper_session_connected(const char* path); virtual void on_zookeeper_session_connecting(const char* path); virtual void on_zookeeper_session_expired(const char *path); virtual void on_zookeeper_session_event(int state, const char *path); virtual void on_zookeeper_event(int type, int state, const char *path); private: volatile bool _stop; std::mutex _mutex; std::condition_variable _cond; std::vector<std::thread> _work_threads; std::vector<std::shared_ptr<CWorker>> _workers; private: volatile bool _conf_changed; // 配置發生變化 volatile bool _zookeeper_session_expired; // zookeeper的會話(session)過期 std::string _zk_nodes; // 存放配置的zookeeper節點列表 std::string _conf_zkpath; // 配置的zookeeper節點路徑 }; int main(int argc, char* argv[]) { CMyApplication app; return mooon::sys::main_template(&app, argc, argv); } static unsigned long long get_current_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); return std::stoull(ss.str()); } CMyApplication::CMyApplication() : _stop(false), _conf_changed(false), _zookeeper_session_expired(false) { _conf_zkpath = "/tmp/conf"; } bool CMyApplication::on_check_parameter() { // 命令列引數“--zookeeper”不能為空 return !mooon::argument::zookeeper->value().empty(); } bool CMyApplication::on_init(int argc, char* argv[]) { try { // 以this方式呼叫的函式,均為CZookeeperHelper提供 _zk_nodes = mooon::argument::zookeeper->value(); this->create_session(_zk_nodes); // zookeeper的會話(session)是非同步建立的, // 只有連線成功後,方可讀取存放在zookeeper上的配置資料。 for (int i=0; i<5&&!_stop; ++i) { if (this->is_connected()) break; else std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } if (!this->is_connected()) { fprintf(stderr, "Can not connect zookeeper://%s\n", _zk_nodes.c_str()); return false; } else { // 取zookeeper節點資料 std::string zkdata; int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4); if (n > 4 || zkdata.empty()) { // 配置資料的大小超出預期 fprintf(stderr, "conf size error: %d\n", n); return false; } else { // 如果zkdata不是一個有效的數字, // stoi會丟擲異常invalid_argument const int num_workers = std::stoi(zkdata); if (num_workers < 1 || num_workers > 10) { fprintf(stderr, "conf error: %d\n", num_workers); return false; } else { return start_workers(&_work_threads, &_workers, num_workers); } } } } catch (std::invalid_argument& ex) { fprintf(stderr, "%s\n", ex.what()); return false; } catch (mooon::sys::CSyscallException& ex) { fprintf(stderr, "%s\n", ex.str().c_str()); return false; } catch (mooon::utils::CException& ex) { fprintf(stderr, "%s\n", ex.str().c_str()); return false; } } bool CMyApplication::on_run() { while (!_stop) { std::unique_lock<std::mutex> lock(_mutex); _cond.wait(lock); // 等待配置更新或收到退出指令 if (_stop) { break; } // 以下實現省略了函式呼叫拋異常處理 if (_zookeeper_session_expired) { // 如果會話過期,則需要重新建會話 recreate_zookeeper_session(); } if (_stop) { // 在建立會話過程中,可能收到了停止指令 break; } if (_conf_changed) { _conf_changed = false; // 讀取新的配置 std::string zkdata; int n = get_zk_data(_conf_zkpath.c_str(), &zkdata, 4); if (n > 4) { // 這種情況下應觸發告警 // 配置資料的大小超出預期 fprintf(stderr, "conf size error: %d\n", n); } else { // 這裡可考慮加上優化: // 只有配置確實發生變化時才進行後續操作。 const int num_workers = std::stoi(zkdata); if (num_workers < 1 || num_workers > 10) { // 這種情況下應觸發告警 fprintf(stderr, "conf error: %d\n", num_workers); } else { std::vector<std::thread> work_threads; std::vector<std::shared_ptr<CWorker>> workers; // 新的配置生效,才停掉原來的, // 防止因為誤操破壞配置,導致整個系統崩潰 if (!start_workers(&work_threads, &workers, num_workers)) { // 這種情況下應觸發告警 } else { stop_workers(&_work_threads, &_workers); _work_threads.swap(work_threads); _workers.swap(workers); } } } } } return true; } void CMyApplication::on_fini() { // 應用退出時被呼叫 fprintf(stdout, "Application is about to quit\n"); } // 接收到了SIGTERM訊號 void CMyApplication::on_terminated() { // 一定要最先呼叫父類CMainHelper的on_terminated mooon::sys::CMainHelper::on_terminated(); _stop = true; stop_workers(&_work_threads, &_workers); std::unique_lock<std::mutex> lock(_mutex); _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run } bool CMyApplication::start_workers( std::vector<std::thread>* work_threads, std::vector<std::shared_ptr<CWorker>>* workers, int num_workers) { try { for (int i=0; i<num_workers; ++i) { std::shared_ptr<CWorker> worker(new CWorker(this, i)); workers->push_back(worker); work_threads->push_back(std::thread(&CWorker::run, worker)); } return true; } catch(const std::system_error& ex) { // 如果有部分啟動功能應當回退,這裡省略了 fprintf(stderr, "(%d)%s\n", ex.code().value(), ex.what()); return false; } } void CMyApplication::stop_workers( std::vector<std::thread>* work_threads, std::vector<std::shared_ptr<CWorker>>* workers) { for (std::vector<std::shared_ptr<CWorker>>::size_type i=0; i<workers->size(); ++i) { (*workers)[i]->stop(); if ((*work_threads)[i].joinable()) (*work_threads)[i].join(); } work_threads->clear(); workers->clear(); } void CMyApplication::recreate_zookeeper_session() { unsigned int count = 0; while (!_stop) { try { recreate_session(); _zookeeper_session_expired = false; break; } catch (mooon::utils::CException& ex) { std::this_thread::sleep_for(std::chrono::milliseconds(2000)); if (0 == count++%30) { fprintf(stderr, "recreate zookeeper session failed: (count:%d)%s\n", count, ex.str().c_str()); } } } } void CMyApplication::on_zookeeper_session_connected(const char* path) { fprintf(stdout, "path=%s\n", path); } void CMyApplication::on_zookeeper_session_connecting(const char* path) { fprintf(stdout, "path=%s\n", path); } void CMyApplication::on_zookeeper_session_expired(const char *path) { fprintf(stdout, "path=%s\n", path); std::unique_lock<std::mutex> lock(_mutex); _zookeeper_session_expired = true; _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run } void CMyApplication::on_zookeeper_session_event(int state, const char *path) { fprintf(stdout, "state=%d, path=%s\n", state, path); } void CMyApplication::on_zookeeper_event(int type, int state, const char *path) { fprintf(stdout, "type=%d, state=%d, path=%s\n", type, state, path); if (ZOO_CONNECTED_STATE == state && ZOO_CHANGED_EVENT == type && 0 == strcmp(path, _conf_zkpath.c_str())) { // 配置發生變化 std::unique_lock<std::mutex> lock(_mutex); _conf_changed = true; _cond.notify_one(); // 喚醒等待狀態的CMyApplication::run } } CWorker::CWorker(CMyApplication* app, int index) : _app(app), _index(index), _stop(false) { } void CWorker::run() { fprintf(stdout, "Worker[%d/%llu] \033[1;33mstarted\033[m\n", _index, get_current_thread_id()); while (!_stop) { // 執行具體的業務邏輯操作,這裡僅以sleep替代做示範 std::this_thread::sleep_for(std::chrono::milliseconds(2000)); fprintf(stdout, "[%s] Worker[\033[1;33m%d\033[m/%llu] is working ...\n", mooon::sys::CDatetimeUtils::get_current_time().c_str(), _index, get_current_thread_id()); } fprintf(stdout, "Worker[%d/%llu] \033[1;33mstopped\033[m\n", _index, get_current_thread_id()); }