flume+RabbitMQ+Storm實時日誌運算處理
阿新 • • 發佈:2019-02-14
一.說明
通常來說,大資料處理hadoop能夠處理絕大部分場景,但是有些需求,比如實時的道路交通情況.實時的訂單情況等等,這類需要實時運算處理的,hadoop處理起來就相對麻煩.
處理實時資料的框架很多,這裡採用apache storm,佇列的話通常採用kafka,但是因為現有的佇列是RabbitMq叢集,為了這個在單獨搭建一套kafka叢集有點浪費資源,因此採用RabbitMQ作為佇列.而日誌的採集依然採用flume.
二.Storm環境搭建
Storm的工作原理請自行百度,
Strom需要zookeeper,首先安裝zookeeper,前文有安裝描述.此處略
解壓Strom的包.
cd storm/conf
vim storm.yaml
單機配置如下:
storm.zookeeper.servers:
- "192.168.130.132"
nimbus.host: "192.168.130.132"
storm.local.dir: "/opt/stormDir"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
ui.port: 8089
儲存配置
按順序啟動
./bin/storm nimbus &./bin/storm supervisor &
./bin/storm ui &
jps檢視是否啟動成功.
拓撲的部署方式參考
2.建立scheme
public class RabbitMqScheme implements Scheme{ @Override public List<Object> deserialize(byte[] bytes) { List objs = new ArrayList(); //直接反序列化為string String str = new String(bytes); //依次返回UUID,String,Number // objs.add(UUID.randomUUID().toString()); objs.add(str); // String numStr = Math.round(Math.random()*8999+1000)+""; // objs.add(numStr); return objs; } @Override public Fields getOutputFields() { return new Fields("str"); }
在拓撲的main方法裡面,建立拓撲配置之前加入
RabbitMqScheme scheme = new RabbitMqScheme(); IRichSpout spout = new RabbitMQSpout(scheme); ConnectionConfig connectionConfig = new ConnectionConfig("192.168.7.79", 5672, "guest", "guest", "/", 10); // host, port, username, password, virtualHost, heartBeat ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig) .queue("sdk_pay_trans_queue_key") .prefetch(200) .requeueOnFail() .build(); 好了,接下來在自己的bolt裡面寫邏輯就行了.
四.從flume-RabbitMQ
1.Flume本身不支援RabbitMQ,同樣在GIT上找到前輩們的傑作,不過由於支援的rabbitmq的版本比較老,所以在我除錯後,也支援了雲盤提供的版本.但是目前只支援exchange型別為direct的佇列,即routing-key=queueName的情況.
2.Flume配置如下
a1.sources = r2 a1.sinks = k3 a1.channels = c3
a1.sources.r2.type = spooldir a1.sources.r2.spoolDir = /var/log/flume_spoolDir_for_rabbitmq a1.sources.r2.deletePolicy=immediate a1.sources.r2.basenameHeader=true a1.sources.r2.channels=c3 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 200 a1.sinks.k3.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink a1.sinks.k3.host = 192.168.7.79 a1.sinks.k3.port = 5672 a1.sinks.k3.virtual-host = / a1.sinks.k3.username = guest a1.sinks.k3.password = guest a1.sinks.k3.exchange = mq-exchange a1.sinks.k3.routing-key = test_aaa #a1.sinks.k3.publisher-confirms = true a1.sinks.k3.channel = c3
配置完畢後儲存,往flume監控的資料夾下丟日誌檔案,可以看到被一行行寫入rabbitmq中.
至此,日誌從flume->rabbitMQ->storm已經打通.具體示例以後有空會發出來
需要的包:
http://pan.baidu.com/s/1nvzUxi5
通常來說,大資料處理hadoop能夠處理絕大部分場景,但是有些需求,比如實時的道路交通情況.實時的訂單情況等等,這類需要實時運算處理的,hadoop處理起來就相對麻煩.
處理實時資料的框架很多,這裡採用apache storm,佇列的話通常採用kafka,但是因為現有的佇列是RabbitMq叢集,為了這個在單獨搭建一套kafka叢集有點浪費資源,因此採用RabbitMQ作為佇列.而日誌的採集依然採用flume.
二.Storm環境搭建
Storm的工作原理請自行百度,
Strom需要zookeeper,首先安裝zookeeper,前文有安裝描述.此處略
解壓Strom的包.
cd storm/conf
vim storm.yaml
單機配置如下:
儲存配置
按順序啟動
./bin/storm nimbus &./bin/storm supervisor &
./bin/storm ui &
jps檢視是否啟動成功.
拓撲的部署方式參考
2.建立scheme
public class RabbitMqScheme implements Scheme{ @Override public List<Object> deserialize(byte[] bytes) { List objs = new ArrayList(); //直接反序列化為string String str = new String(bytes); //依次返回UUID,String,Number // objs.add(UUID.randomUUID().toString()); objs.add(str); // String numStr = Math.round(Math.random()*8999+1000)+""; // objs.add(numStr); return objs; } @Override public Fields getOutputFields() { return new Fields("str"); }
在拓撲的main方法裡面,建立拓撲配置之前加入
RabbitMqScheme scheme = new RabbitMqScheme(); IRichSpout spout = new RabbitMQSpout(scheme); ConnectionConfig connectionConfig = new ConnectionConfig("192.168.7.79", 5672, "guest", "guest", "/", 10); // host, port, username, password, virtualHost, heartBeat ConsumerConfig spoutConfig = new ConsumerConfigBuilder().connection(connectionConfig) .queue("sdk_pay_trans_queue_key") .prefetch(200) .requeueOnFail() .build(); 好了,接下來在自己的bolt裡面寫邏輯就行了.
四.從flume-RabbitMQ
1.Flume本身不支援RabbitMQ,同樣在GIT上找到前輩們的傑作,不過由於支援的rabbitmq的版本比較老,所以在我除錯後,也支援了雲盤提供的版本.但是目前只支援exchange型別為direct的佇列,即routing-key=queueName的情況.
2.Flume配置如下
a1.sources = r2 a1.sinks = k3 a1.channels = c3
a1.sources.r2.type = spooldir a1.sources.r2.spoolDir = /var/log/flume_spoolDir_for_rabbitmq a1.sources.r2.deletePolicy=immediate a1.sources.r2.basenameHeader=true a1.sources.r2.channels=c3 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 200 a1.sinks.k3.type = com.aweber.flume.sink.rabbitmq.RabbitMQSink a1.sinks.k3.host = 192.168.7.79 a1.sinks.k3.port = 5672 a1.sinks.k3.virtual-host = / a1.sinks.k3.username = guest a1.sinks.k3.password = guest a1.sinks.k3.exchange = mq-exchange a1.sinks.k3.routing-key = test_aaa #a1.sinks.k3.publisher-confirms = true a1.sinks.k3.channel = c3
配置完畢後儲存,往flume監控的資料夾下丟日誌檔案,可以看到被一行行寫入rabbitmq中.
至此,日誌從flume->rabbitMQ->storm已經打通.具體示例以後有空會發出來
需要的包:
http://pan.baidu.com/s/1nvzUxi5