使用zookeeper監控服務
阿新 • • 發佈:2019-02-15
只針對windows平臺, linux平臺比較容易,就不寫了.
一、.單機 安裝zookeeper的三個服務.目錄層次如下:
e:|
|zks
| |server1
| |data
| |zookeeper
| |conf
| |bin
| |lib
| |server2
|
data
| |zookeeper
| |conf
| |bin
| |lib
| |server3 |
data
| |zookeeper
| |conf
| |bin
| |lib
|| |run.bat
其中run.bat檔案的內容為:
cd server1\zookeeper\bin
start zkServer.cmd
cd ..\..\..
cd server2\zookeeper\bin
start zkServer.cmd
cd ..\..\..
cd server3\zookeeper\bin
start zkServer.cmd
二、開啟zookeeper\src\c\下的工程檔案, 編譯其, 可得zookeeper的c客戶端(dll)
三、編寫程序監控類, 內容如下:
ZKOpeator.h
#ifndef _COGLINKZKOPERATORH_ #define _COGLINKZKOPERATORH_ #include <string> #include <zookeeper.h> #include <boost/thread/recursive_mutex.hpp> #include <boost/thread.hpp> #include <boost/thread/condition_variable.hpp> typedef void (* DoCore)(); namespace coglink{ namespace base{ class ZKOperator{ public: static ZKOperator *Instance(); bool Init(const std::string &hostport, int timeouts); // 以下為監控相關 void StartZKThread(const std::string &ppath, const std::string &vvalue, DoCore doCore); void StartIfNecessary(); bool IsNeedStartAnCopy(); // 以下為配置相關 std::string GetValue(const std::string &ppath); private: ZKOperator(); void MainCore(); void WaitChange(); private: DoCore doCore_; std::string ppath_; std::string value_; boost::thread_group threadGroup_; boost::condition_variable_any cond_; boost::recursive_mutex mutex_; zhandle_t *zh_; bool waiting_; static ZKOperator * staticP_; bool startAnCopy_; }; }} #endif
ZKOperator.cpp
#include "ZKOperator.h"
namespace coglink{ namespace base{
ZKOperator *ZKOperator::staticP_ = 0;
void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context)
{
if(type==4 && state==3){
ZKOperator::Instance()->StartIfNecessary();
}
}
ZKOperator::ZKOperator(){
waiting_ = false;
zh_ = 0;
startAnCopy_ = false;
}
ZKOperator *ZKOperator::Instance(){
if(!staticP_){
staticP_ = new ZKOperator();
}
return staticP_;
}
bool ZKOperator::IsNeedStartAnCopy(){
return startAnCopy_;
}
void ZKOperator::WaitChange(){
int pos = (int)ppath_.rfind("/");
if(pos < 0){
return;
}
std::string rootPath = ppath_.substr(0, pos);
String_vector vecs;
zoo_get_children(zh_, rootPath.c_str(), 1, &vecs);
}
void ZKOperator::MainCore(){
doCore_();
}
void ZKOperator::StartIfNecessary(){
int re = zoo_exists(zh_, ppath_.c_str(), 0, 0);
while(1){
if(re == ZNONODE){
re = zoo_create(zh_, ppath_.c_str(), value_.c_str(), value_.length(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, 0, 0);
if(re != ZOK){
printf("create node error!=====\r\n");
break;
}
startAnCopy_ = true;
boost::thread * th = new boost::thread(boost::bind(&ZKOperator::MainCore, this));
threadGroup_.add_thread(th);
}
break;
}
WaitChange();
if(threadGroup_.size() == 0){
if(waiting_){
return;
}
boost::recursive_mutex::scoped_lock lock(mutex_);
waiting_ = true;
cond_.wait(lock);
}
}
bool ZKOperator::Init(const std::string &hostport, int timeouts){
zh_ = zookeeper_init(hostport.c_str(), watcher, timeouts, 0, 0, 0);
if (!zh_) {
printf("init zk error!\r\n");
return false;
}
return true;
}
void ZKOperator::StartZKThread(const std::string &ppath, const std::string &vvalue, DoCore doCore){
ppath_ = ppath;
value_ = vvalue;
doCore_ = doCore;
StartIfNecessary();
threadGroup_.join_all();
}
std::string ZKOperator::GetValue(const std::string &ppath){
std::string tmppath = "/zookeeper/" + ppath;
int oldpos = -1;
int pos = 0;
while(1){
pos = (int)tmppath.find(".", oldpos + 1);
if(pos < 0){
break;
}
tmppath.replace(pos, 1, "/");
}
std::string re;
int len = 255;
char *buf = new char[len];
int ne = zoo_get(zh_, tmppath.c_str(), 0, buf, &len, 0);
if(ne != ZOK){
delete []buf;
return re;
}
re = std::string(buf, len);
delete []buf;
return re;
}
}}
四、呼叫處
_logicNo = std::string(argv[1]);
std::string ppath = "/zookeeper/logicpoint/";
ppath += _logicNo;
coglink::base::ZKOperator::Instance()->StartZKThread(ppath, "", MainCore);
在MainCore中會啟動主要業務程式碼, 同時會新啟動一個程序(自己的名字), 如下:
if(coglink::base::ZKOperator::Instance()->IsNeedStartAnCopy()){
ShellExecute(NULL,"open", coglink::base::CogPath::GetAppWholePath().c_str(),_logicNo.c_str(),NULL,SW_MINIMIZE);
}
注意上方的ShellExecute, 我原來用的是system函式(之後據我測試可知此函式是啟動一個子程序).
用system函式的結果是: 主程序退出後子程序代替其開始主要業務處理, 而由於主程序是意外退出的, 作業系統只知其還有子程序沒退出, 因而就不會釋放一些被佔用的埠,
導致子程序再次listen時失敗.