1. 程式人生 > >c++通過Thrift向flume傳送資料

c++通過Thrift向flume傳送資料

在flume原始碼包apache-flume-1.7.0-src\flume-ng-sdk\src\main\thrift\flume.thrift 目錄下,有Flume的Thrift介面定義(IDL)語言,通過該Thrift可以生成客戶端模組基礎程式碼,此處我們生成相應的c++程式碼:

thrift -r --gen cpp flume.thrift
編譯過程中可能會報 "libthrift-0.9.3.so: cannot open shared object file: No such file or directory"

臨時解決方法是執行命令 

export LD_LIBRARY_PATH=/usr/local/lib

或者在 /etc/ld.so.conf中加入libthrift庫所在路徑,本機是 /usr/local/lib

命令執行成功後,在當前目錄下生成gen-cpp資料夾,內有檔案

flume_constants.cpp  

flume_constants.h  

flume_types.cpp  

flume_types.h  

ThriftSourceProtocol.cpp  

ThriftSourceProtocol.h  

ThriftSourceProtocol_server.skeleton.cpp

檔案ThriftSourceProtocol_server.skeleton.cpp是系統為我們自動生成的一個客戶端,可以在此基礎上實現我們自己的應用程式碼

flume.cpp:

#include "gen-cpp/flume_constants.h"  
#include "gen-cpp/flume_types.h"  
#include "gen-cpp/ThriftSourceProtocol.h"  
#include <thrift/protocol/TBinaryProtocol.h>  
#include <thrift/protocol/TCompactProtocol.h>  
#include <thrift/transport/TSocket.h>  
#include <thrift/transport/TTransportUtils.h>  
#include <vector>  
  
using namespace std;  
using namespace apache::thrift;  
using namespace apache::thrift::protocol;  
using namespace apache::thrift::transport;  
#define LOOP 200000  
  
int right_num = 0;  
int error_num = 0;  
  
class ThriftClient{  
    public:  
    /* Thrift protocol needings... */  
    boost::shared_ptr<TTransport> socket;  
    boost::shared_ptr<TTransport> transport;  
    boost::shared_ptr<TProtocol> protocol;  
    ThriftSourceProtocolClient* pClient;  
  
    public:  
    void sendEvent();  
    ThriftClient();  
  
};  
  
ThriftClient::ThriftClient():  
        socket(new TSocket("127.0.0.1",9413)),  
        transport(new TFramedTransport(socket)),  
        protocol(new TCompactProtocol(transport))  
    {  
        pClient = new ThriftSourceProtocolClient(protocol);  
    }  
  
//transport(new TBufferedTransport(socket)),  
void ThriftClient::sendEvent()  
{  
    std::map<std::string, std::string>  headers;  
    headers.insert(std::make_pair("head", "head"));  
    std::string sBody = "TableName:TEST_TABLE ConfigID:5555 ResponseIP:77522222 ProtoType:67 StartTime:3333 Interval:24544 AccessTimes:45 DomainLen:12\n";  
    if(!transport->isOpen())  
    {  
        transport->open();  
    }  
    ThriftFlumeEvent tfEvent;  
    tfEvent.__set_headers(headers);  
    tfEvent.__set_body(sBody);  
    Status::type res;  
    int i=0;  
    std::vector<ThriftFlumeEvent> eventbatch;  
    for(;i<LOOP;i++){  
        //tfEvent.__set_body(sBody);  
        eventbatch.clear();  
        int j=1;  
        for(;j<=50;j++)  
            eventbatch.push_back(tfEvent);  
        res =pClient->appendBatch(eventbatch);  
        if(res == Status::OK){  
            right_num++;  
        }else{  
            error_num++;  
            printf("WARNING: send event via thrift failed, return code:%d\n",res);  
        }  
    }  
}  
  
int main(int argc, char * argv[]){  
    ThriftClient *client = new ThriftClient();  
    client->sendEvent();  
    printf("RIGHT: success num:%d\n",right_num);  
    printf("ERROR: failed num:%d\n",error_num);  
    client->transport->close();  
}
編譯:

g++ -g -Wall -I./ -I/usr/local/include/thrift flume.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -L/usr/local/lib/*.so -lthrift -o flume_client

測試:

1、在本機啟動flume,配置thrift souece埠9413,sink方式為logger,啟動flume,配置檔案如下:

agent.sources = r1  
agent.sinks = k1  
agent.channels = c1  
  
# Describe/configure the source  
agent.sources.r1.type = thrift  
agent.sources.r1.port = 9413  
agent.sources.r1.bind = 0.0.0.0  
agent.sources.r1.threads = 50  
  
# Use a channel which buffers events in file  
agent.channels.c1.type = memory  
agent.channels.c1.capacity = 10000000  
agent.channels.c1.transactionCapacity= 2000  
  
# Describe the sink k1  
agent.sinks.k1.type = logger  
  
# Bind the source and sink to the channel  
agent.sources.r1.channels = c1  
agent.sinks.k1.channel = c1  

2、啟動flume
bin/flume-ng agent --conf ./conf/ -f conf/thrift_logger.conf -n agent -Dflume.root.logger=INFO,console &  
3、執行server,在flume端看到列印:
2017-09-15 17:51:03,229 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,231 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,233 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }
2017-09-15 17:51:03,233 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{head=head} body: 54 61 62 6C 65 4E 61 6D 65 3A 54 45 53 54 5F 54 TableName:TEST_T }