數據轉發處理工具logstash
1、logstash是什麽
Logstash 是有管道輸送能力的開源數據收集引擎。它可以動態地從分散的數據源收集數據,並且標準化數據輸送到你選擇的目的地。它是一款日誌而不僅限於日誌的搜集處理框架,將分散多樣的數據搜集自定義處理並輸出到指定位置。
2、logstash 原理
logstash 原理非常簡單,就是將數據收集起來,經過filter操作,然後通過output轉發出去。
3、實例
problems:最近接到一個需求,要求從一個集群的kafka中讀取原始日誌到另一個集群的kafka隊列中,其實就是做kafka集群的數據遷移。
solve:傳統的方式其實也很好做,就是寫一個程序,將這個集群的每個kafka topic中的數據讀到,寫入到另一個集群的kafka隊列中。但是這種做法太古老,事情不是很急,想想有啥開源的工具可用。(從hive、pig封裝MapReduce,thriftserver封裝spark JOB受到啟示),果然還真有工具這就是logstash。
看了下官網的入門文檔,非常簡單。支持文件、ES、kafka、redis、hdfs很多輸入和輸出。
步驟:
1)、首先下載從官網下載工具,https://www.elastic.co/downloads/logstash,我下載的是zip包,版本 logstash-6.2.2.zip。
2)、用unzip命令解壓縮,進入config目錄。解壓之後包含目錄如下:
3)、創建配置文件kafka_input.conf,名稱自己指定。
input{ kafka{ bootstrap_servers => ["10.67.1.150:9092,10.67.1.151:9092,10.67.1.152:9092"] client_id => "tam_normal_1" group_id => "tam_normal_1" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["bsatam_normal"] session_timeout_ms => "60000" codec => plain { format => "%{message}" } } } output{ kafka{ topic_id => ["bsatam_normal"] bootstrap_servers => ["10.67.1.205:9092,10.67.1.206:9092,10.67.1.207:9092"] codec => plain { format => "%{message}" } } }
kafka的配置都很熟,重點說下codec 這裏的配置,這裏是配置輸出的格式,我是接送格式,所以直接獲取了message。
4)、在config目錄直接輸入命令./../bin/logstash -f kafka_input.conf --path.data=/home/bsauser/logstash-6.2.2/data/kafka_input ,啟動轉發程序。
5)、在output配置的kafka集群中就可以讀到數據了。
啟動成功截圖:
需要註意的問題:
1、 需要修改你發送集群的host主機,否則kafka發數據的時候會報超時,當初這個問題也困擾了我很久。
比如我在10.67.1.205集群上運行,我需要在205集群節點的主機10.67.1.205上配置kafka_input中配置的主機信息,比如在205主機上加上:
10.67.1.150 bsa150
10.67.1.151 bsa151
10.67.1.152 bsa152
2、這個工具會將數據格式的每個字段都轉化成type類型,input和output中間可以加上filter配置,用於對原始日誌的過濾操作。
3、path.data 參數一定要指定,之前沒指定也運行成功過,但是後續報錯了。同時運行多個實例的時候,最好是指定下,裏面存的是臨時文件信息。
數據轉發處理工具logstash