1. 程式人生 > 實用技巧 >原理實踐,全面講解Logstash+Kibana+kafka

原理實踐,全面講解Logstash+Kibana+kafka

前面的時候,我因為後臺粉絲的一些問題,整理了一篇文章,將ELK三個技術進行詳細的講解,從原理到實踐,全面覆蓋,但是因為篇幅原因,我分成了兩篇進行整理,上篇主講ES

今天是其他的個技術:Logstash+Kibana,中間穿插著講解Kafka應用,個人公眾號:Java架構師聯盟,每日更新技術好文

話不多說,直接上正題

一、 Logstash資料採集工具安裝和使用

1. 簡介

Logstash是一款輕量級的日誌蒐集處理框架,可以方便的把分散的、多樣化的日誌蒐集起來,並進行自定義的處理,然後傳輸到指定的位置,比如某個伺服器或者檔案。

而在官網,對於Logstash的介紹更是完整,我這裡就展示一下官網的介紹

輸入:採集各種樣式、大小和來源的資料

過濾器:實時解析和轉換資料

輸出:選擇你的儲存,匯出你的資料

而在官網的介紹中,最讓我興奮的就是可擴充套件性,Logstash 採用可插拔框架,擁有 200 多個外掛。您可以將不同的輸入選擇、過濾器和輸出選擇混合搭配、精心安排,讓它們在管道中和諧地執行。這也就意味著可以用自己的方式建立和配置管道,就跟樂高積木一樣,我自己感覺太爽了

好了,理論的東西過一遍就好

ps:不過這也體現出官網在學習的過程中的重要性,雖然都是英文的,但是,現在可以翻譯的軟體的太多了,這不是問題

2. 安裝

所有的技術,不自己實際操作一下是不可以的,安裝上自己動手實踐一下,毛爺爺都說:實踐是檢驗真理的唯一標準

,不得不誇獎一下Logstash的工程師,真的太人性化了,下載後直接解壓,就可以了。

而且提供了很多的安裝方式供你選擇,舒服

3. helloword使用

開始我們今天的第一個實踐吧,就像我們剛開始學Java的時候,第一個命令就是helloworld,不知道各位還能不能手寫出來呢?來看一下logstash的第一個執行時怎麼處理的

通過命令列,進入到logstash/bin目錄,執行下面的命令:

input {
  kafka {
    type => "accesslogs"
    codec => "plain"
    auto_offset_reset => "smallest"
    group_id => "elas1"
    topic_id => "accesslogs"
    zk_connect => "172.16.0.11:2181,172.16.0.12:2181,172.16.0.13:2181"
  }
 
  kafka {
    type => "gamelogs"
    auto_offset_reset => "smallest"
    codec => "plain"
    group_id => "elas2"
    topic_id => "gamelogs"
    zk_connect => "172.16.0.11:2181,172.16.0.12:2181,172.16.0.13:2181"
  }
}
 
filter {
  if [type] == "accesslogs" {
    json {
      source => "message"
  remove_field => [ "message" ]
  target => "access"
    }
  }
 
  if [type] == "gamelogs" {
    mutate {
      split => { "message" => " " }
      add_field => {
        "event_type" => "%{message[3]}"
        "current_map" => "%{message[4]}"
        "current_X" => "%{message[5]}"
        "current_y" => "%{message[6]}"
        "user" => "%{message[7]}"
        "item" => "%{message[8]}"
        "item_id" => "%{message[9]}"
        "current_time" => "%{message[12]}"
     }
     remove_field => [ "message" ]
   }
  }
}
 
output {
 
  if [type] == "accesslogs" {
    elasticsearch {
      index => "accesslogs"
  codec => "json"
      hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"]
    }
  }
 
  if [type] == "gamelogs" {
    elasticsearch {
      index => "gamelogs"
      codec => plain {
        charset => "UTF-16BE"
      }
      hosts => ["172.16.0.14:9200", "172.16.0.15:9200", "172.16.0.16:9200"]
    }
  }
}

  可以看到提示下面資訊(這個命令稍後介紹),輸入hello world!

  可以看到logstash為我們自動添加了幾個欄位:

時間戳:@ timestamp

版本:@ version

輸入的型別:type

主機名:host。

4.1. 簡單的工作原理

  Logstash使用管道方式進行日誌的蒐集處理和輸出。有點類似*NIX系統的管道命令 xxx | ccc | ddd,xxx執行完了會執行ccc,然後執行ddd。

  在logstash中,包括了三個階段:

  輸入input --> 處理filter(不是必須的) --> 輸出output

  每個階段都有很多的外掛配合工作,比如file、elasticsearch、redis等等。

  每個階段也可以指定多種方式,比如輸出既可以輸出到elasticsearch中,也可以指定到stdout在控制檯列印。

  由於這種外掛式的組織方式,使得logstash變得易於擴充套件和定製。

4.2. 命令列中常用的命令

  -f:通過這個命令可以指定Logstash的配置檔案,根據配置檔案配置logstash

  -e:後面跟著字串,該字串可以被當做logstash的配置(如果是“” 則預設使用stdin作為輸入,stdout作為輸出)

  -l:日誌輸出的地址(預設就是stdout直接在控制檯中輸出)

  -t:測試配置檔案是否正確,然後退出。

4.3. 配置檔案說明

  前面介紹過logstash基本上由三部分組成,input、output以及使用者需要才新增的filter,因此標準的配置檔案格式如下:

input {...}
filter {...}
output {...}

  在每個部分中,也可以指定多個訪問方式,例如我想要指定兩個日誌來原始檔,則可以這樣寫:

input {
 file { path =>"/var/log/messages" type =>"syslog"}
 file { path =>"/var/log/apache/access.log" type =>"apache"}
}

  類似的,如果在filter中添加了多種處理規則,則按照它的順序一一處理,但是有一些外掛並不是執行緒安全的。

  比如在filter中指定了兩個一樣的的外掛,這兩個任務並不能保證準確的按順序執行,因此官方也推薦避免在filter中重複使用外掛。

說完這些,簡單的建立一個配置檔案的小例子看看:

input {
file {
   #指定監聽的檔案路徑,注意必須是絕對路徑
        path => "E:/software/logstash-1.5.4/logstash-1.5.4/data/test.log"
        start_position => beginning
    }
}
filter {
    
}
output {
    stdout {}
}

日誌大致如下:注意最後有一個空行。

1 hello,this is first line in test.log!
2 hello,my name is xingoo!
3 goodbye.this is last line in test.log!
4

 執行命令得到如下資訊:

5. 最常用的input外掛——file。

 這個外掛可以從指定的目錄或者檔案讀取內容,輸入到管道處理,也算是logstash的核心外掛了,大多數的使用場景都會用到這個外掛,因此這裡詳細講述下各個引數的含義與使用。

5.1. 最小化的配置檔案

在Logstash中可以在 input{} 裡面新增file配置,預設的最小化配置如下:

input {
    file {
        path => "E:/software/logstash-1.5.4/logstash-1.5.4/data/*"
    }
}
filter {
    
}
output {
    stdout {}
}

當然也可以監聽多個目標檔案:

input {
    file {
        path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"]
    }
}
filter {
    
}
output {
    stdout {}
}

5.2. 其他的配置

另外,處理path這個必須的項外,file還提供了很多其他的屬性:

input {
    file {
        #監聽檔案的路徑
        path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"]
 
        #排除不想監聽的檔案
        exclude => "1.log"
        
        #新增自定義的欄位
        add_field => {"test"=>"test"}
 
        #增加標籤
        tags => "tag1"
 
        #設定新事件的標誌
        delimiter => "\n"
 
        #設定多長時間掃描目錄,發現新檔案
        discover_interval => 15
 
        #設定多長時間檢測檔案是否修改
        stat_interval => 1
 
         #監聽檔案的起始位置,預設是end
        start_position => beginning
 
        #監聽檔案讀取資訊記錄的位置
        sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
 
        #設定多長時間會寫入讀取的位置資訊
        sincedb_write_interval => 15
        
    }
}
filter {
    
}
output {
    stdout {}
}

  其中值得注意的是:

  1 path

  是必須的選項,每一個file配置,都至少有一個path

  2 exclude

  是不想監聽的檔案,logstash會自動忽略該檔案的監聽。配置的規則與path類似,支援字串或者陣列,但是要求必須是絕對路徑。

  3 start_position

  是監聽的位置,預設是end,即一個檔案如果沒有記錄它的讀取資訊,則從檔案的末尾開始讀取,也就是說,僅僅讀取新新增的內容。對於一些更新的日誌型別的監聽,通常直接使用end就可以了;相反,beginning就會從一個檔案的頭開始讀取。但是如果記錄過檔案的讀取資訊,這個配置也就失去作用了。

  4 sincedb_path

  這個選項配置了預設的讀取檔案資訊記錄在哪個檔案中,預設是按照檔案的inode等資訊自動生成。其中記錄了inode、主裝置號、次裝置號以及讀取的位置。因此,如果一個檔案僅僅是重新命名,那麼它的inode以及其他資訊就不會改變,因此也不會重新讀取檔案的任何資訊。類似的,如果複製了一個檔案,就相當於建立了一個新的inode,如果監聽的是一個目錄,就會讀取該檔案的所有資訊。

  5 其他的關於掃描和檢測的時間,按照預設的來就好了,如果頻繁建立新的檔案,想要快速監聽,那麼可以考慮縮短檢測的時間。

  //6 add_field
  #這個技術感覺挺六的,但是其實就是增加一個欄位,例如:
file {
     add_field => {"test"=>"test"}
        path => "D:/tools/logstash/path/to/groksample.log"
        start_position => beginning
    }

6.  Kafka與Logstash的資料採集對接

基於Logstash跑通Kafka還是需要注意很多東西,最重要的就是理解Kafka的原理。

6.1. Logstash工作原理

由於Kafka採用解耦的設計思想,並非原始的釋出訂閱,生產者負責產生訊息,直接推送給消費者。而是在中間加入持久化層——broker,生產者把資料存放在broker中,消費者從broker中取資料。這樣就帶來了幾個好處:

1 生產者的負載與消費者的負載解耦

2 消費者按照自己的能力fetch資料

3 消費者可以自定義消費的數量

另外,由於broker採用了主題topic-->分割槽的思想,使得某個分割槽內部的順序可以保證有序性,但是分割槽間的資料不保證有序性。這樣,消費者可以以分割槽為單位,自定義讀取的位置——offset。

Kafka採用zookeeper作為管理,記錄了producer到broker的資訊,以及consumer與broker中partition的對應關係。因此,生產者可以直接把資料傳遞給broker,broker通過zookeeper進行leader-->followers的選舉管理;消費者通過zookeeper儲存讀取的位置offset以及讀取的topic的partition分割槽資訊。

由於上面的架構設計,使得生產者與broker相連;消費者與zookeeper相連。有了這樣的對應關係,就容易部署logstash-->kafka-->logstash的方案了。

接下來,按照下面的步驟就可以實現logstash與kafka的對接了。

6.2. 啟動kafka

##啟動zookeeper:
$zookeeper/bin/zkServer.sh start

##啟動kafka:
$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

6.3. 建立主題

#建立主題:
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

#檢視主題:
$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

6.4. 測試環境

#執行生產者指令碼:
$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

#執行消費者指令碼,檢視是否寫入:
$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

6.5. 向kafka中輸出資料

input{
       stdin{}
      }
output{
       kafka{
       topic_id => "hello" 
       bootstrap_servers => "192.168.0.4:9092,172.16.0.12:9092" 
       # kafka的地址 
       batch_size => 5
  codec => plain {
format => "%{message}"
charset => "UTF-8"
  }
      }
stdout{
       codec => rubydebug
      }
}

6.6. 從kafka中讀取資料

logstash配置檔案:

input{
      kafka {
              codec => "plain" 
              group_id => "logstash1" 
              auto_offset_reset => "smallest" 
              reset_beginning => true 
              topic_id => "hello" 
              zk_connect => "192.168.0.5:2181" 
              }
       }
output{
       stdout{
               codec => rubydebug
               }
       }

7. Filter

7.1. 過濾外掛grok元件

#日誌
55.3.244.1 GET /index.html 15824 0.043
 
bin/logstash -e '
input { stdin {} }
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}
output { stdout {codec => rubydebug} }'
 

7.2. 分割外掛split

filter {
  mutate {
    split => { "message" => " " }
      add_field => {
        "event_type" => "%{message[3]}"
        "current_map" => "%{message[4]}"
        "current_X" => "%{message[5]}"
        "current_y" => "%{message[6]}"
        "user" => "%{message[7]}"
        "item" => "%{message[8]}"
        "item_id" => "%{message[9]}"
        "current_time" => "%{message[12]}"
     }
     remove_field => [ "message" ]
  }
}

四、 Kibana報表工具的安裝和使用

1. 簡介

Logstash 早期曾經自帶了一個特別簡單的 logstash-web 用來檢視 ES 中的資料。其功能太過簡單,於是產生了Kibana。不過是用PHP編寫,後來為了滿足更多的使用需求,懶人推動科技的進步嘛,並且Logstash使用ruby進行編寫,所以重新編寫Kibana,直到現在,Kibana因為重構,導致3,4某些情況下不相容,所以出現了一山容二虎的情況,具體怎麼選擇,可以根據業務場景進行實際分析

在Kibana眾多的優秀特性中,我個人最喜歡的是這一個特性,我起名叫包容性

  因為在官網介紹中,Kibana可以非常方便地把來自Logstash、ES-Hadoop、Beats或第三方技術的資料整合到Elasticsearch,支援的第三方技術包括Apache Flume、Fluentd等。這也就表明我在日常的開發工作中,對於技術選型和操作的時候,我可以有更多的選擇,在開發時也能找到相應的開發例項,節省了大量的開發時間

ps:有一次體現了官網的重要性,真的,有時候官網可以幫你解決大多數的問題,有時間可以去看一下官網啊,好了,話不多說,看正題

2. 安裝

下載安裝包後解壓

編輯檔案config/kibana.yml ,配置屬性:

[root@H32 ~]# cd kibana/config/
 [root@H32 config]# vim kibana.yml
 //新增:
 server.host: "192.168.80.32"
elasticsearch.url: "http://172.16.0.14:9200"

先啟動ES,然後再啟動

cd /usr/local/kibana530bin/kibana

注意:

1、kibana必須是在root下執行,否則會報錯,啟動失敗

2、下載解壓安裝包,一定要裝與ES相同的版本

3. 匯入資料

我們將使用莎士比亞全集作為我們的示例資料。要更好的使用 Kibana,你需要為自己的新索引應用一個對映集(mapping)。我們用下面這個對映集建立"莎士比亞全集"索引。實際資料的欄位比這要多,但是我們只需要指定下面這些欄位的對映就可以了。注意到我們設定了對 speaker 和 play_name 不分析。原因會在稍後講明。

在終端執行下面命令:

curl -XPUT http://localhost:9200/shakespeare -d '
{
 "mappings" : {
  "_default_" : {
   "properties" : {
    "speaker" : {"type": "string", "index" : "not_analyzed" },
    "play_name" : {"type": "string", "index" : "not_analyzed" },
    "line_id" : { "type" : "integer" },
    "speech_number" : { "type" : "integer" }
   }
  }
 }
}

我們這就建立好了索引。現在需要做的時匯入資料。莎士比亞全集的內容我們已經整理成了 elasticsearch 批量 匯入所需要的格式,你可以通過shakeseare.json下載。

用如下命令匯入資料到你本地的 elasticsearch 程序中。

curl -XPUT localhost:9200/_bulk --data-binary @shakespeare.json

4. 訪問 Kibana 介面

開啟瀏覽器,訪問已經發布了 Kibana 的本地伺服器。

如果你解壓路徑無誤(譯者注:使用 github 原始碼的讀者記住釋出目錄應該是 kibana/src/ 裡面),你已經就可以看到上面這個可愛的歡迎頁面。點選 Sample Dashboard 連結

好了,現在顯示的就是你的 sample dashboard!如果你是用新的 elasticsearch 程序開始本教程的,你會看到一個百分比佔比很重的餅圖。這裡顯示的是你的索引中,文件型別的情況。如你所見,99% 都是 lines,只有少量的 acts 和scenes。

在下面,你會看到一長段 JSON 格式的莎士比亞詩文。

5. 第一次搜尋

Kibana 允許使用者採用 Lucene Query String 語法搜尋 Elasticsearch 中的資料。請求可以在頁面頂部的請求輸入框中書寫。

在請求框中輸入如下內容。然後查看錶格中的前幾行內容。

friends, romans, countrymen

6. 配置另一個索引

目前 Kibana 指向的是 Elasticsearch 一個特殊的索引叫 _all。 _all 可以理解為全部索引的大集合。目前你只有一個索引, shakespeare,但未來你會有更多其他方面的索引,你肯定不希望 Kibana 在你只想搜《麥克白》裡心愛的句子的時候還要搜尋全部內容。

配置索引,點選右上角的配置按鈕:

在這裡,你可以設定你的索引為 shakespeare ,這樣 Kibana 就只會搜尋 shakespeare 索引的內容了。

這是因為 ES1.4 增強了許可權管理。你需要在 ES 配置檔案 elasticsearch.yml 中新增下列配置並重啟服務後才能正常訪問:

http.cors.enabled: true
http.cors.allow-origin: "*"

記住 kibana3 頁面也要重新整理快取才行。

此外,如果你可以很明確自己 kibana 以外沒有其他 http 訪問,可以把 kibana 的網址寫在http.cors.allow-origin 引數的值中。比如:

http.cors.allow-origin: "/https?:\/\/kbndomain/"

好了,到這裡就結束了,不知道有沒有收穫呀,有收穫的朋友給我點個贊吧~