1. 程式人生 > >數據轉發處理工具logstash

數據轉發處理工具logstash

輸入 文檔 serve 官網下載 解壓縮 logs www. cli 進入

1、logstash是什麽
Logstash 是有管道輸送能力的開源數據收集引擎。它可以動態地從分散的數據源收集數據,並且標準化數據輸送到你選擇的目的地。它是一款日誌而不僅限於日誌的搜集處理框架,將分散多樣的數據搜集自定義處理並輸出到指定位置。

2、logstash 原理
logstash 原理非常簡單,就是將數據收集起來,經過filter操作,然後通過output轉發出去。

3、實例
problems:最近接到一個需求,要求從一個集群的kafka中讀取原始日誌到另一個集群的kafka隊列中,其實就是做kafka集群的數據遷移。

solve:傳統的方式其實也很好做,就是寫一個程序,將這個集群的每個kafka topic中的數據讀到,寫入到另一個集群的kafka隊列中。但是這種做法太古老,事情不是很急,想想有啥開源的工具可用。(從hive、pig封裝MapReduce,thriftserver封裝spark JOB受到啟示),果然還真有工具這就是logstash。

看了下官網的入門文檔,非常簡單。支持文件、ES、kafka、redis、hdfs很多輸入和輸出。

步驟:

1)、首先下載從官網下載工具,https://www.elastic.co/downloads/logstash,我下載的是zip包,版本 logstash-6.2.2.zip。

2)、用unzip命令解壓縮,進入config目錄。解壓之後包含目錄如下:

技術分享圖片

3)、創建配置文件kafka_input.conf,名稱自己指定。

input{
      kafka{
        bootstrap_servers => ["10.67.1.150:9092,10.67.1.151:9092,10.67.1.152:9092"]
        client_id => "tam_normal_1"
        group_id => "tam_normal_1"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["bsatam_normal"] 
        session_timeout_ms => "60000"
    	codec => plain {
          format => "%{message}"
    }
      }
}
output{
     kafka{
        topic_id => ["bsatam_normal"]
        bootstrap_servers => ["10.67.1.205:9092,10.67.1.206:9092,10.67.1.207:9092"]
        codec => plain {
          format => "%{message}"
        }
    }
}

kafka的配置都很熟,重點說下codec 這裏的配置,這裏是配置輸出的格式,我是接送格式,所以直接獲取了message。

4)、在config目錄直接輸入命令./../bin/logstash -f kafka_input.conf --path.data=/home/bsauser/logstash-6.2.2/data/kafka_input ,啟動轉發程序。

5)、在output配置的kafka集群中就可以讀到數據了。

啟動成功截圖:

技術分享圖片

需要註意的問題:

1、 需要修改你發送集群的host主機,否則kafka發數據的時候會報超時,當初這個問題也困擾了我很久。
比如我在10.67.1.205集群上運行,我需要在205集群節點的主機10.67.1.205上配置kafka_input中配置的主機信息,比如在205主機上加上:
10.67.1.150 bsa150
10.67.1.151 bsa151
10.67.1.152 bsa152

2、這個工具會將數據格式的每個字段都轉化成type類型,input和output中間可以加上filter配置,用於對原始日誌的過濾操作。

3、path.data 參數一定要指定,之前沒指定也運行成功過,但是後續報錯了。同時運行多個實例的時候,最好是指定下,裏面存的是臨時文件信息。

數據轉發處理工具logstash