storm下執行C++程式(一)
轉載,原文地址:http://blog.csdn.net/yan_mount/article/details/11527799
學習storm有段時間了,也搭建了一個簡單的環境,很欣賞它的一些理念,考慮到很多程式是C++實現的,如果要使用該平臺的話,需要為這些程式實現一個介面,方便統一在storm中執行,折騰了幾天,初步成功的實現了一個C++的bolt,特分享如下:
1,需要先定義一個java的殼:
- MyShellBolt extends ShellBolt implements IRichBolt{
- public MyShellBolt()
- {
- super("/bin/sh","start.sh"
- }
- @Override
- publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- returnnull;
- }
- }
2,指令碼start.sh中:
- chmod a+x Bolt
- ./Bolt
這個地方使用了指令碼來中轉一下,因為直接在java中呼叫super("./Bolt")時,遇到了各種問題,要麼找不到檔案,要麼沒有執行許可權,最後使用指令碼才解決這個問題。
3,仿照storm自帶的python程式的例子編寫了C++的庫:
主要是採用jsoncpp庫(jsoncpp-src-0.5.0)實現json串的解析和輸出 比如:
- void StormUtils::storm_emit(const string msg[],size_t size)
- {
- Json::Value out;
- out["command"]="emit";
- for(size_t i=0;i<size;i++)
- {
- out["tuple"].append(msg[i]);
- }
- outs << out.toStyledString() << "end\n"
- return;
- }
- void StormUtils::init(const string& handshake)
- {
- Json::Reader reader;
- Json::Value config;
- if (reader.parse(handshake, config))
- {//parse handshake OK
- constchar* piddir=config["pidDir"].asCString();
- int iPid = (int)getpid();
- char pid[64];
- sprintf(pid, "%d", iPid);
- stringstream ss;
- ss<<piddir<<"/"<<iPid;
- ofstream file(ss.str().c_str(),ofstream::out);
- file.close();
- msg_map msg;
- msg_pair my_pair("pid",pid);
- msg.insert(my_pair);
- storm_send(msg);
- }
- return;
- }
其中要注意的是:
1,init握手函式的json輸出中,pid的value的number,所以不能加“”,其它輸出value都是string。
2,每次輸出到storm平臺的json串最後都要加一行end,如:
outs<<out.toStyledString()<< "end\n"<<endl;
3,jsoncpp的reader.parse函式在遇到非json格式串時,會被阻塞住(不知原因),所以自己還簡單判斷了下storm傳給bolt的訊息串是否是json格式,否則丟棄
其中bolt的主體迴圈流程為:
- void process(::StormUtils& stormUtils)
- {
- Json::Reader reader;
- Json::Value value;
- string msg;
- while(true)
- {
- //this function will be blocked from stdin
- stormUtils.read_msg(std::cin,msg);
- stormUtils.storm_log("read_msg:"+msg);
- size_t pos=msg.find("{");
- if (pos==string::npos)
- {//no {} in string,then discard
- continue;
- }
- elseif (pos>0)
- {//erase the invalid part from msg
- msg=msg.erase(0,pos);
- }
- if (reader.parse(msg, value))
- {
- bool hasID=value.isMember("id");
- if (hasID)
- {//get tuple
- //tuple is array
- constchar* ID=value["id"].asCString();
- const Json::Value tuples = value["tuple"];
- string tuple = tuples[0u].asString();
- vector<string> words=stormUtils.split(tuple," ");
- vector<string>::iterator it;
- for(it=words.begin();it!=words.end();it++)
- {
- stormUtils.storm_log("emit:"+*it);
- string outMsg[]={*it};
- stormUtils.storm_emit(outMsg,1);
- }
- stormUtils.storm_log(ID);
- stormUtils.storm_ack(ID);
- }
- }
- else
- {
- stormUtils.storm_log("msg parse error:"+msg);
- }
- msg.clear();//ready to read again from stdin
- }
- return;
- }
4,打包
把shell指令碼和Bolt執行檔案放在jar包的/resources目錄下即可
5,驗證
無論在本地模式還是叢集模式下都執行成功,除錯時在topology中記得開啟
conf.setDebug(true);
這樣在本地模式執行時,從日誌裡就可以看到C++程式中列印的日誌,如:
4861 [Thread-21] INFO backtype.storm.task.ShellBolt - Shell msg: read_msg:{"id":"-1825914362791431189","stream":"default","comp":"MySpout","tuple":["snow white and the seven dwarfs"],"task":1}
系統啟動C程式的日誌:
4724 [Thread-20] INFO backtype.storm.task.ShellBolt - Launched subprocess with pid 2039
6,問題
1,不知為什麼,bolt程式會接收到非json格式的輸入:
Shell msg: read_msg:[3][3][3][3][3][3]
造成程式僵死,用單元測試也發現jsoncpp有這個問題,還請高手指點驗證
總之,用C++實現的Bolt基本跑通,為後續真正的業務模組(C++實現)的使用打下基礎