1. 程式人生 > >Logstash 快速入門

Logstash 快速入門

簡介

Logstash是一個接收,處理,轉發日誌的工具。支援 系統 日誌,webserver日誌,錯誤日誌,應用日誌,總之包括所有可以丟擲來的日誌型別。怎麼樣聽起來挺厲害的吧?
在一個典型的使用場景下(ELK):用Elasticsearch作為後臺資料的儲存,kibana用來前端的報表展示。Logstash在其過程中擔任搬運工的角色,它為資料儲存,報表查詢和日誌解析建立了一個功能強大的管道鏈。Logstash提供了多種多樣的 input,filters,codecs和output 元件 ,讓使用者輕鬆實現強大的功能。好了讓我們開始吧

依賴條件:JAVA

Logstash執行僅僅依賴java執行環境(jre)。各位可以在命令列下執行java -version命令 顯示類似如下結果:
java -version
java version "1.7.0_45"
Java(TM) SE Runtime Environment (build 1.7.0_45-b18)
Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)
為了確保成功執行Logstash建議大家使用較近期的jre版本。 可以獲取開源版本的jre在:http://openjdk.java.net 或者你可以在官網下載 Oracle  jdk版本:http://www.oracle.com/technetwork/java/index.html 一旦jre已經成功在你的系統中安裝完成,我們就可以繼續了

啟動和執行Logstash的兩條命令示例

第一步我們先下載Logstash
curl -O https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
現在你應該有了一個叫logstash-1.4.2.tar.gz的檔案了。 我們把它解壓一下
tar zxvf logstash-1.4.2.tar.gz
cd logstash-1.4.2
現在我們來執行一下:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
我們現在可以在命令列下輸入一些字元,然後我們將看到logstash的輸出內容:
hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world
Ok,還挺有意思的吧... 以上例子我們在執行logstash中,定義了一個叫"stdin"的input還有一個"stdout"的output,無論我們輸入什麼字元,Logstash都會按照某種格式來返回我們輸入的字元。這裡注意我們在命令列中使用了 -e 引數,該引數允許Logstash直接通過命令列接受設定。這點尤其快速的幫助我們反覆的測試配置是否正確而不用寫配置檔案。
讓我們再試個更有意思的例子。首先我們在命令列下使用CTRL-C命令退出之前執行的Logstash。現在我們重新執行Logstash使用下面的命令:
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
我們再輸入一些字元,這次我們輸入"goodnight moon":
goodnight moon
{
  "message" => "goodnight moon",
  "@timestamp" => "2013-11-20T23:48:05.335Z",
  "@version" => "1",
  "host" => "my-laptop"
}
以上示例通過重新設定了叫"stdout"的output(添加了"codec"引數),我們就可以改變Logstash的輸出表現。類似的我們可以通過在你的配置檔案中新增或者修改inputs、outputs、filters,就可以使隨意的格式化日誌資料成為可能,從而訂製更合理的儲存格式為查詢提供便利。

使用Elasticsearch儲存日誌

現在,你也許會說:"它看起來還挺高大上的,不過手工輸入字元,並把字元從控制檯回顯出來。實際情況並不實用"。說的好,那麼接下來我們將建立Elasticsearch來儲存輸入到Logstash的日誌資料。如果你還沒有安裝Elasticsearch,你可以下載RPM/DEB包或者手動下載tar包,通過以下命令:
curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.1.1.tar.gz
tar zxvf elasticsearch-1.1.1.tar.gz
cd elasticsearch-1.1.1/
./bin/elasticsearch

注意  本篇文章例項使用Logstash 1.4.2和Elasticsearch 1.1.1。不同的Logstash版本都有對應的建議Elasticsearch版本。請確認你使用的Logstash版本!
更多有關安裝和設定Elasticsearch的資訊可以參考Elasticsearch官網。因為我們主要介紹Logstash的入門使用,Elasticsearch預設的安裝和配置就已經滿足我們要求。
言歸正專,現在Elasticsearch已經執行並監聽9200埠了(大家都搞定了,對嗎?),通過簡單的設定Logstash就可以使用Elasticsearch作為它的後端。預設的配置對於Logstash和Elasticsearch已經足夠,我們忽略一些額外的選項來設定elasticsearch作為output:
bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } }'
隨意的輸入一些字元,Logstash會像之前一樣處理日誌(不過這次我們將不會看到任何的輸出,因為我們沒有設定stdout作為output選項)
you know, for logs
我們可以使用curl命令傳送請求來檢視ES是否接收到了資料:
curl 'http://localhost:9200/_search?pretty'
返回內容如下:
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "logstash-2013.11.21",
      "_type" : "logs",
      "_id" : "2ijaoKqARqGvbMgP3BspJA",
      "_score" : 1.0, "_source" : {"message":"you know, for logs","@timestamp":"2013-11-21T18:45:09.862Z","@version":"1","host":"my-laptop"}
    } ]
  }
}
恭喜,至此你已經成功利用Elasticsearch和Logstash來收集日誌資料了。

Elasticsearch 外掛(題外話)

這裡介紹另外一個對於查詢你的Logstash資料(Elasticsearch中資料)非常有用的工具叫Elasticsearch-kopf外掛。更多的資訊請見Elasticsearch外掛。安裝elasticsearch-kopf,只要在你安裝Elasticsearch的目錄中執行以下命令即可:
bin/plugin -install lmenezes/elasticsearch-kopf
接下來訪問 http://localhost:9200/_plugin/kopf 來瀏覽儲存在Elasticsearch中的資料,設定及對映!

多重輸出

作為一個簡單的例子來設定多重輸出,讓我們同時設定stdout和elasticsearch作為output來重新執行一下Logstash,如下:
bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } stdout { } }'
當我們輸入了一些片語之後,這些輸入的內容回回顯到我們的終端,同時還會儲存到Elasticsearch!(可以使用curl和kopf外掛來驗證)。

預設配置 - 按照每日日期建立索引

你將發現Logstash可以足夠靈巧的在Elasticsearch上建立索引... 每天會按照預設格式是logstash-YYYY.MM.DD來建立索引。在午夜(GMT),Logstash自動按照時間戳更新索引。我們可以根據追溯多長時間的資料作為依據來制定保持多少資料,當然你也可以把比較老的資料遷移到其他的地方(重新索引)來方便查詢,此外如果僅僅是簡單的刪除一段時間資料我們可以使用Elasticsearch Curator。

接下來

接下來我們開始瞭解更多高階的配置項。在下面的章節,我們著重討論logstash一些核心的特性,以及如何和logstash引擎互動的。

事件的生命週期

Inputs,Outputs,Codecs,Filters構成了Logstash的核心配置項。Logstash通過建立一條事件處理的管道,從你的日誌提取出資料儲存到Elasticsearch中,為高效的查詢資料提供基礎。為了讓你快速的瞭解Logstash提供的多種選項,讓我們先討論一下最常用的一些配置。更多的資訊,請參考Logstash事件管道。
Inputs
input 及輸入是指日誌資料傳輸到Logstash中。其中常見的配置如下: file:從檔案系統中讀取一個檔案,很像UNIX命令 "tail -0a"syslog:監聽514埠,按照RFC3164標準解析日誌資料redis:從redis伺服器讀取資料,支援channel(釋出訂閱)和list模式。redis一般在Logstash消費叢集中作為"broker"角色,儲存events佇列共Logstash消費。lumberjack:使用lumberjack協議來接收資料,目前已經改為 logstash-forwarder。  Filters
Fillters 在Logstash處理鏈中擔任中間處理元件。他們經常被組合起來實現一些特定的行為來,處理匹配特定規則的事件流。常見的filters如下: grok:解析無規則的文字並轉化為有結構的格式。Grok 是目前最好的方式來將無結構的資料轉換為有結構可查詢的資料。有120多種匹配規則,會有一種滿足你的需要。mutate:mutate filter 允許改變輸入的文件,你可以從命名,刪除,移動或者修改欄位在處理事件的過程中。drop:丟棄一部分events不進行處理,例如:debug events。clone:拷貝 event,這個過程中也可以新增或移除欄位。geoip:新增地理資訊(為前臺kibana圖形化展示使用)  Outputs
outputs是logstash處理管道的最末端元件。一個event可以在處理過程中經過多重輸出,但是一旦所有的outputs都執行結束,這個event也就完成生命週期。一些常用的outputs包括: elasticsearch:如果你計劃將高效的儲存資料,並且能夠方便和簡單的進行查詢...Elasticsearch是一個好的方式。是的,此處有做廣告的嫌疑,呵呵。file:將event資料儲存到檔案中。graphite:將event資料傳送到圖形化元件中,一個很流行的開源儲存圖形化展示的元件。http://graphite.wikidot.com/。statsd:statsd是一個統計服務,比如技術和時間統計,通過udp通訊,聚合一個或者多個後臺服務,如果你已經開始使用statsd,該選項對你應該很有用。  Codecs
codecs 是基於資料流的過濾器,它可以作為input,output的一部分配置。Codecs可以幫助你輕鬆的分割傳送過來已經被序列化的資料。流行的codecs包括 json,msgpack,plain(text)。 json:使用json格式對資料進行編碼/解碼multiline:將匯多個事件中資料彙總為一個單一的行。比如:java異常資訊和堆疊資訊 獲取完整的配置資訊,請參考 Logstash文件中 "plugin configuration"部分。

更多有趣Logstash內容

使用配置檔案

使用-e引數在命令列中指定配置是很常用的方式,不過如果需要配置更多設定則需要很長的內容。這種情況,我們首先建立一個簡單的配置檔案,並且指定logstash使用這個配置檔案。如我們建立一個檔名是"logstash-simple.conf"的配置檔案並且儲存在和Logstash相同的目錄中。內容如下:
input { stdin { } }
output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}
接下來,執行命令:
bin/logstash -f logstash-simple.conf
我們看到logstash按照你剛剛建立的配置檔案來執行例子,這樣更加的方便。注意,我們使用-f引數來從檔案獲取而代替之前使用-e引數從命令列中獲取配置。以上演示非常簡單的例子,當然解析來我們繼續寫一些複雜一些的例子。

過濾器

filters是一個行處理機制將提供的為格式化的資料整理成你需要的資料,讓我們看看下面的一個例子,叫grok filter的過濾器。
input { stdin { } }

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}
執行Logstash按照如下引數:
bin/logstash -f logstash-filter.conf
現在貼上下面一行資訊到你的終端(當然Logstash就會處理這個標準的輸入內容):
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你將看到類似如下內容的反饋資訊:
{
        "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
     "@timestamp" => "2013-12-11T08:01:45.000Z",
       "@version" => "1",
           "host" => "cadenza",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "11/Dec/2013:00:01:45 -0800",
           "verb" => "GET",
        "request" => "/xampp/status.php",
    "httpversion" => "1.1",
       "response" => "200",
          "bytes" => "3891",
       "referrer" => "\"http://cadenza/xampp/navi.php\"",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}
正像你看到的那樣,Logstash(使用了grok過濾器)能夠將一行的日誌資料(Apache的"combined log"格式)分割設定為不同的資料欄位。這一點對於日後解析和查詢我們自己的日誌資料非常有用。比如:HTTP的返回狀態碼,IP地址相關等等,非常的容易。很少有匹配規則沒有被grok包含,所以如果你正嘗試的解析一些常見的日誌格式,或許已經有人為了做了這樣的工作。如果檢視詳細匹配規則,參考logstash grok patterns。
另外一個過濾器是date filter。這個過濾器來負責解析出來日誌中的時間戳並將值賦給timestame欄位(不管這個資料是什麼時候收集到logstash的)。你也許注意到在這個例子中@timestamp欄位是設定成December 11, 2013, 說明logstash在日誌產生之後一段時間進行處理的。這個欄位在處理日誌中回添到資料中的,舉例來說... 這個值就是logstash處理event的時間戳。

實用的例子

Apache 日誌(從檔案獲取)

現在,讓我們使用一些非常實用的配置... apache2訪問日誌!我們將從本地讀取日誌檔案,並且通過條件設定處理滿足我們需要的event。首先,我們建立一個檔名是logstash-apache.conf的配置檔案,內容如下(你可以根據實際情況修改你的檔名和路徑):
input {
  file {
    path => "/tmp/access_log"
    start_position => beginning
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    host => localhost
  }
  stdout { codec => rubydebug }
}
接下來,我們按照上面的配置建立一個檔案(在例子中是"/tmp/access.log"),可以將下面日誌資訊作為檔案內容(也可以用你自己的webserver產生的日誌):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
現在使用-f引數來執行一下上面的例子:
bin/logstash -f logstash-apache.conf
你可以看到apache的日誌資料已經匯入到ES中了。這裡logstash會按照你的配置讀取,處理指定的檔案,任何後新增到檔案的內容也會被捕獲處理最後儲存到ES中。此外,資料中type的欄位值會被替換成"apache_access"(這個功能在配置中已經指定)。
這個配置只是讓Logstash監控了apache access_log,但是在實際中往往並不夠用可能還需要監控error_log,只要在上面的配置中改變一行既可以實現,如下:
input {
  file {
    path => "/tmp/*_log"
...

現在你可以看到logstash處理了error日誌和access日誌。然而,如果你檢查了你的資料(也許用elasticsearch-kopf),你將發現access_log日誌被分成不同的欄位,但是error_log確沒有這樣。這是因為我們使用了“grok”filter並僅僅配置匹配combinedapachelog日誌格式,這樣滿足條件的日誌就會自動的被分割成不同的欄位。我們可以通過控制日誌按照它自己的某種格式來解析日誌,不是很好的嗎?對吧。
此外,你也許還會發現Logstash不會重複處理檔案中已經處理過得events。因為Logstash已經記錄了檔案處理的位置,這樣就只處理檔案中新加入的行數。漂亮!

條件判斷

我們利用上一個例子來介紹一下條件判斷的概念。這個概念一般情況下應該被大多數的Logstash使用者熟悉掌握。你可以像其他普通的 程式設計 語言一樣來使用if,else if和else語句。讓我們把每個event依賴的日誌檔案型別都標記出來(access_log,error_log其他以log結尾的日誌檔案)。
input {
  file {
    path => "/tmp/*_log"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else if [path] =~ "error" {
    mutate { replace => { type => "apache_error" } }
  } else {
    mutate { replace => { type => "random_logs" } }
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}

我想你已經注意到了,我們使用"type"欄位來標記每個event,但是我們實際上沒有解析"error"和”random"型別的日誌... 而實際情況下可能會有很多很多型別的錯誤日誌,如何解析就作為練習留給各位讀者吧,你可以依賴已經存在的日誌。

Syslog

Ok,現在我們繼續瞭解一個很實用的例子:syslog。Syslog對於Logstash是一個很長用的配置,並且它有很好的表現(協議格式符合RFC3164)。Syslog實際上是UNIX的一個網路日誌標準,由客戶端傳送日誌資料到本地檔案或者日誌伺服器。在這個例子中,你根本不用建立syslog例項;我們通過命令列就可以實現一個syslog服務,通過這個例子你將會看到發生什麼。
首先,讓我們建立一個簡單的配置檔案來實現logstash+syslog,檔名是 logstash-syslog.conf
input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}

執行logstash:
bin/logstash -f logstash-syslog.conf

通常,需要一個客戶端連結到Logstash伺服器上的5000埠然後傳送日誌資料。在這個簡單的演示中我們簡單的使用telnet連結到logstash伺服器傳送日誌資料(與之前例子中我們在命令列標準輸入狀態下發送日誌資料類似)。首先我們開啟一個新的shell視窗,然後輸入下面的命令:
telnet localhost 5000
你可以複製貼上下面的樣例資訊(當然也可以使用其他字元,不過這樣可能會被grok filter不能正確的解析):
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

之後你可以在你之前執行Logstash的視窗中看到輸出結果,資訊被處理和解析!
{
                 "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
              "@timestamp" => "2013-12-23T22:30:01.000Z",
                "@version" => "1",
                    "type" => "syslog",
                    "host" => "0:0:0:0:0:0:0:1:52617",
        "syslog_timestamp" => "Dec 23 14:30:01",
         "syslog_hostname" => "louis",
          "syslog_program" => "CRON",
              "syslog_pid" => "619",
          "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
             "received_at" => "2013-12-23 22:49:22 UTC",
           "received_from" => "0:0:0:0:0:0:0:1:52617",
    "syslog_severity_code" => 5,
    "syslog_facility_code" => 1,
         "syslog_facility" => "user-level",
         "syslog_severity" => "notice"
}

恭喜各位,看到這裡你已經成為一個合格的Logstash使用者了。你將可以輕鬆的配置,執行Logstash,還可以傳送event給Logstash,但是這個過程隨著使用還會有很多值得深挖的地方。