c++通過Thrift向flume傳送資料
在flume原始碼包apache-flume-1.7.0-src\flume-ng-sdk\src\main\thrift\flume.thrift 目錄下,有Flume的Thrift介面定義(IDL)語言,通過該Thrift可以生成客戶端模組基礎程式碼,此處我們生成相應的c++程式碼:
thrift -r --gen cpp flume.thrift
編譯過程中可能會報 "libthrift-0.9.3.so: cannot open shared object
file: No such file or directory"
臨時解決方法是執行命令
export LD_LIBRARY_PATH=/usr/local/lib
或者在 /etc/ld.so.conf中加入libthrift庫所在路徑,本機是 /usr/local/lib
命令執行成功後,在當前目錄下生成gen-cpp資料夾,內有檔案
flume_constants.cpp
flume_constants.h
flume_types.cpp
flume_types.h
ThriftSourceProtocol.cpp
ThriftSourceProtocol.h
ThriftSourceProtocol_server.skeleton.cpp
檔案ThriftSourceProtocol_server.skeleton.cpp是系統為我們自動生成的一個客戶端,可以在此基礎上實現我們自己的應用程式碼
flume.cpp:
編譯:#include "gen-cpp/flume_constants.h" #include "gen-cpp/flume_types.h" #include "gen-cpp/ThriftSourceProtocol.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TCompactProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <vector> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; #define LOOP 200000 int right_num = 0; int error_num = 0; class ThriftClient{ public: /* Thrift protocol needings... */ boost::shared_ptr<TTransport> socket; boost::shared_ptr<TTransport> transport; boost::shared_ptr<TProtocol> protocol; ThriftSourceProtocolClient* pClient; public: void sendEvent(); ThriftClient(); }; ThriftClient::ThriftClient(): socket(new TSocket("127.0.0.1",9413)), transport(new TFramedTransport(socket)), protocol(new TCompactProtocol(transport)) { pClient = new ThriftSourceProtocolClient(protocol); } //transport(new TBufferedTransport(socket)), void ThriftClient::sendEvent() { std::map<std::string, std::string> headers; headers.insert(std::make_pair("head", "head")); std::string sBody = "TableName:TEST_TABLE ConfigID:5555 ResponseIP:77522222 ProtoType:67 StartTime:3333 Interval:24544 AccessTimes:45 DomainLen:12\n"; if(!transport->isOpen()) { transport->open(); } ThriftFlumeEvent tfEvent; tfEvent.__set_headers(headers); tfEvent.__set_body(sBody); Status::type res; int i=0; std::vector<ThriftFlumeEvent> eventbatch; for(;i<LOOP;i++){ //tfEvent.__set_body(sBody); eventbatch.clear(); int j=1; for(;j<=50;j++) eventbatch.push_back(tfEvent); res =pClient->appendBatch(eventbatch); if(res == Status::OK){ right_num++; }else{ error_num++; printf("WARNING: send event via thrift failed, return code:%d\n",res); } } } int main(int argc, char * argv[]){ ThriftClient *client = new ThriftClient(); client->sendEvent(); printf("RIGHT: success num:%d\n",right_num); printf("ERROR: failed num:%d\n",error_num); client->transport->close(); }
g++ -g -Wall -I./ -I/usr/local/include/thrift flume.cpp
gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp -L/usr/local/lib/*.so -lthrift -o flume_client
測試:
1、在本機啟動flume,配置thrift souece埠9413,sink方式為logger,啟動flume,配置檔案如下:
agent.sources = r1
agent.sinks = k1
agent.channels = c1
# Describe/configure the source
agent.sources.r1.type = thrift
agent.sources.r1.port = 9413
agent.sources.r1.bind = 0.0.0.0
agent.sources.r1.threads = 50
# Use a channel which buffers events in file
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000000
agent.channels.c1.transactionCapacity= 2000
# Describe the sink k1
agent.sinks.k1.type = logger
# Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
2、啟動flume
bin/flume-ng agent --conf ./conf/ -f conf/thrift_logger.conf -n agent -Dflume.root.logger=INFO,console &
3、執行server,在flume端看到列印:
2017-09-15 17:51:03,229 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,231 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,233 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,233 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }