1. 程式人生 > >資料收集之Logstash

資料收集之Logstash

Logstash

之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。
Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。
Input產生事件,Filter修改事件,Output將事件傳送到其他地方。
注:經常看到的編解碼器codec也是過濾器,可以作為輸入或輸出的一部分,對流進行編解碼。

簡單部署

#下載解壓
[[email protected] ~/software]$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2
.tar.gz [[email protected] ~/software]$ tar -zxvf logstash-6.3.2.tar.gz #自檢-接收標準輸入並輸出到控制檯 [[email protected] ~/software/logstash-6.3.2]$ bin/logstash -e 'input { stdin { } } output { stdout {} }' 輸入 Hello 輸出 { "@version" => "1", "message" => "Hello", "@timestamp"
=> 2018-08-11T11:49:31.059Z, "host" => "localhost.local" }

Mysql增量同步至 Kafka/Hdfs/Elasticsearch

Mysql 測試庫表

生產中一個庫同一類資料一般會有多張分表如16張分表,256張分表。

#建庫 user_logs
create database user_logs;

#建表-使用者瀏覽日誌表: user_browse_0_logs,user_browse_1_logs 如
CREATE TABLE `user_browse_0_logs`
( `id` int(11) NOT NULL AUTO_INCREMENT, `product_id` int(11) NOT NULL, `event_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `uid` varchar(10) NOT NULL, PRIMARY KEY (`id`), KEY `event_time` (`event_time`) USING BTREE, KEY `uid_event_time` (`uid`,`event_time`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='使用者瀏覽日誌表';

Logstash 配置

基礎配置

#配置目錄
[wangpei@localhost ~/software/logstash-6.3.2]$ mkdir db_test

#拷貝預設配置
[wangpei@localhost ~/software/logstash-6.3.2]$ cp config/* db_test/

#db_test/jvm.options
#jvm配置 
-Xms1g
-Xmx1g

#db_test/logstash.yml
#持久化目錄 
#logstash和plugin任何需要持久化的資料都會放在這個目錄
path.data: /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data

#queue  
#預設memory,logstash程序重啟會丟資料
queue.type: persisted

#併發度
pipeline.workers: 2

#每個執行緒從輸入收集的最大數量
pipeline.batch.size: 125

Input-Filter-Output配置

[wangpei@localhost ~/software/logstash-6.3.2]$ vim db_test/user_browse_logs.conf
#輸入配置 jdbc外掛
input {
  jdbc {
    #jdbc驅動
    jdbc_driver_library => "/Users/wangpei/software/logstash-6.3.2/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    #jdbc連線
    jdbc_connection_string => "jdbc:mysql://node2:3306/user_logs"
    jdbc_user => "root"
    jdbc_password => "111"
    #任務排程
    schedule => "* * * * *"
    #上次執行的sql_last_value儲存路徑
    #啟動前需要確保該檔案已存在
    last_run_metadata_path => "/Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs"
    #sql語句
    #按主鍵自增id 增量同步
    #input在拿到欄位為日期型別的資料時,一定會減8小時並帶上時區資訊
    #如原始mysql event_time欄位為日期型別(這裡是timestamp型別)的資料2018-08-12 14:38:24 =>經過input後=> 2018-08-12T14:38:24.000Z
    #不想讓input修改原始資料,這裡直接把event_time轉成成字串型別。
    statement => "SELECT id,product_id,DATE_FORMAT(event_time, '%Y-%m-%d %H:%i:%s') as event_time,uid FROM user_browse_0_logs WHERE id > :sql_last_value"
    #設定成true 用tracking_column列的值去更新上次記錄的sql_last_value值
    use_column_value => true
    #追蹤的列
    tracking_column => "id"
    #追蹤的列的資料型別 支援["numeric", "timestamp"]
    tracking_column_type => "numeric"
    #外掛id 便於監控
    id => "user_browse_logs_input_jdbc"
    #新增欄位 表名 便於後期DQC 如mysql 每個分表的資料和HDFS每個分割槽資料count、sum驗證
    add_field => {"table_name" => "user_browse_0_logs"}
    #input event輸出格式 每個event轉換成json+換行符格式
    codec => json_lines {charset => "UTF-8"}
  }
}

#Filter配置
filter {
  #預設會對每條資料加上@timestamp
  #不指定@timestamp欄位來源,預設會以logstash伺服器收到資料時間為準
  date {
        #匹配到這種格式就會替換成目標欄位,目標欄位預設是@timestamp
        #貌似不管怎麼設定,Logstash中 @timestamp永遠是0時區
        #@timestamp 0時區,資料寫入ES,@timestamp 在Kibana上顯示時會根據瀏覽器市區自動調整 如自動+8
        match => ["event_time" , "yyyy-MM-dd HH:mm:ss"]
        #timezone => "Asia/Shanghai"
    #target => "@timestamp"
    }
  #增加dt欄位,該欄位來源於event_time,用於hdfs按天分割槽、ES按天索引
  #總結:
  #假設源資料是北京時間
  #1)資料同步到ES:(需要兼顧kibana各種作圖、Java/Python API查詢)
  #   A、按北京時間建索引如下dt
  #   B、kibana視覺化,時間選擇用@timestamp時間欄位,避免用event_time時間欄位做不了Date Histogram問題。
  #   C、ES API查詢,時間選擇用event_time時間欄位更方便。
  #2)資料同步到HDFS: 按北京時間分割槽 如下dt即可
  ruby {code => "event.set('dt', event.get('event_time')[0,10].gsub('-',''));"}
}

#輸出配置
output {
  #輸出到控制檯
  stdout { codec => rubydebug}
  #輸出到kafka
  kafka {
    #kafka brokers
    bootstrap_servers => "localhost:9092"
    #topic
    topic_id => "testTopic3"
    #ack
    acks => "1"
    #輸出格式
    codec => json
    #id
    id => "user_browse_logs_ouput_kafka"
  }
  #輸出到HDFS
  #需要開啟HDFS 的webhdfs
  webhdfs {
    codec => json_lines {charset => "UTF-8"}
    #namenode
    host => 'node1'
    port => 50070
    #standby namenode
    standby_host => 'node4'
    standby_port => 50070
    #user
    user => 'root'
    #user需要對path有讀寫許可權
    path => '/data/user_logs/user_browse_logs/%{dt}/%{table_name}.log'
    #攢到多少條event時向hdfs寫一次資料
    flush_size => 3
    #間隔多久向hdfs寫一次資料 單位:秒
    idle_flush_time => 2
  }
  #輸出到elasticsearch
  elasticsearch {
         #es節點列表
         hosts => ["localhost:9200"]
         #索引使用的模板
         #模板中定義了mapping(日期Format,某個欄位使用的分詞器)和setting
     template_name => "template.user_browse_logs"
         #index
         #資料是北京時間,這個按北京時間建索引
         index => "user_browse_logs.%{dt}"
     #type
         document_type =>"browse_logs"
         #許可權
         user => "logstash_input"
         password => "logstash_input"
         codec => json {charset => "UTF-8"}
    }
}

啟動方式

#直接啟動===> 一個input-filter-output配置獨享一個logstash程序
[[email protected] ~/software/logstash-6.3.2]$ bin/logstash --path.settings db_test/  -f db_test/user_browse_logs.conf --config.reload.automatic

#pipeline方式啟動===> 多個input-filter-output配置共享一個logstash程序
[[email protected] ~/software/logstash-6.3.2]$ bin/logstash --path.settings db_test/  --config.reload.automatic
需配置db_test/pipelines.yml
配置多個pipeline。每個pipeline可對應一個表(input-filter-output)。
- pipeline.id: user_logs.user_browse_logs
  queue.type: persisted
  pipeline.workers: 1
  queue.max_bytes: 32mb
  path.config: "/Users/wangpei/software/logstash-6.3.2/db_test/user_browse_logs.conf"

- pipeline.id: user_logs.user_pay_logs
  queue.type: persisted
  pipeline.workers: 1
  queue.max_bytes: 32mb
  path.config: "/Users/wangpei/software/logstash-6.3.2/db_test/user_pay_logs.conf"
  ...

結果

mysql插入資料

insert into user_browse_0_logs(product_id,uid) values(1,"0");
insert into user_browse_0_logs(product_id,uid) values(2,"0");
insert into user_browse_0_logs(product_id,uid) values(3,"0");

mysql> select * from user_browse_0_logs;
+----+------------+---------------------+-----+
| id | product_id | event_time          | uid |
+----+------------+---------------------+-----+
|  1 |          1 | 2018-08-12 20:18:31 | 0   |
|  2 |          2 | 2018-08-12 20:18:31 | 0   |
|  3 |          3 | 2018-08-12 20:18:32 | 0   |
+----+------------+---------------------+-----+

kafka收到的資料

[wangpei@localhost ~/software/logstash-6.2.4]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic3

{"@version":"1","dt":"20180812","product_id":3,"uid":"0","id":3,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:32","@timestamp":"2018-08-12T12:18:32.000Z"}
{"@version":"1","dt":"20180812","product_id":1,"uid":"0","id":1,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":2,"uid":"0","id":2,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}

hdfs收到的資料

#分割槽:20180812
#分割槽下的檔案:user_browse_0_logs.log user_browse_0_logs是表名
#該路徑來自:output webhdfs path引數 path => '/data/user_logs/user_browse_logs/%{dt}/%{table_name}.log'

[root@node2 /root]# hdfs dfs -cat /data/user_logs/user_browse_logs/20180812/user_browse_0_logs.log
{"@version":"1","dt":"20180812","product_id":1,"uid":"0","id":1,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":2,"uid":"0","id":2,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":3,"uid":"0","id":3,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:32","@timestamp":"2018-08-12T12:18:32.000Z"}

es收到的資料

image

檢視jdbc偏移量記錄檔案

#該路徑來自input jdbc外掛 last_run_metadata_path引數
last_run_metadata_path => "/Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs"

#此時id的偏移量是3,下次取id>3的資料。這樣就實現了增量同步
[wangpei@localhost ~/software/logstash-6.3.2]$ cat /Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs
--- 3

檢視logstash持久化的資料

#持久化目錄下包含2個目錄,一個檔案
#該目錄來自於logstash.yml中 path.data: /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data

[wangpei@localhost ~/software/logstash-6.3.2]$ ls -l /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data
total 8
drwxr-xr-x  2 wangpei  staff   68  8 12 19:38 dead_letter_queue
drwxr-xr-x  4 wangpei  staff  136  8 12 19:57 queue
-rw-r--r--  1 wangpei  staff   36  8 12 19:38 uuid

#uuid:logstash agent uuid
#queue:input → queue → filter + output。queue配置為persisted時,input會把資料先寫入此持久化目錄,然後再由filter + output處理。
#dead_letter_queue:DLQ佇列。死信佇列儲存響應程式碼為400或404的情況,兩者都表示無法重試的事件。
如資料input=>filter=>output(es),
一條資料出現mapping error,此時如果在logstash.yml中配置了dead_letter_queue.enable: true,
就會將這條資料放在DLQ對應的持久化目錄中。然後可以通過DLQ(Dead Letter Queues) Input => Filter(再處理)=>Output ES。

附:Kafka Input

input{
    kafka{
        codec => json {charset => "UTF-8"}
        #消費者組
        group_id => "test_topic_consumer1"
        #從最新開始消費
        auto_offset_reset => "latest"
        #消費的topics
        topics => ["test_topic"]
        #bootstrap server
        bootstrap_servers => "localhost:9092"
        #注意:執行緒數最好與partition數目一樣多,超過partition數量的執行緒會閒置。
        #kafka topic 一個partition同一時刻只能有一個執行緒去消費。
        consumer_threads =>3
    }
}

Logstash監控

可通過X-Pack以UI的方式檢視監控資訊,也可通過監控API檢視。
監控API主要有四類:節點資訊、外掛資訊、節點狀態統計、熱執行緒統計。
- 節點資訊

介面返回os、jvm、pipelines配置資訊 如jvm最大最小堆、pipelines worker數。
http://localhost:9600/_node/<os/pipelines/jvm>?pretty 
  • 外掛資訊
列出所有可用外掛。
http://localhost:9600/_node/plugins?pretty
  • 節點狀態統計
http://localhost:9600/_node/stats/<type>?pretty
type=jvm 獲取jvm統計資訊。如jvm當前執行緒數、執行緒峰值、記憶體各區(youngoldsurvivor)記憶體使用情況、gc次數、gc時間
type=process 獲取程序統計資訊。如記憶體消耗和CPU使用情況
type=events 獲取event相關統計資訊。如輸入、filter、輸出中的event數目
type=pipelines 獲取每個pipeline統計資訊、
  • 熱執行緒統計
獲取當前熱執行緒。熱執行緒是一個Java執行緒,具有較高的CPU使用率並且執行時間超過正常時間。
http://localhost:9600/_node/hot_threads?pretty

注意

  • 一個input多個output
如一個jdbc,多個output(hdfs、kafka、es),kafka和es連線不上,input會繼續輸入,偏移量如自增id會繼續增加,配置了queue.type: persisted,output plugin kafka/es正常後,會重發歷史資料到kafka/es。原因:input在確認資料發往queue後就返回了。
  • jdbc全量匯入
去掉jdbc sql語句中where條件即可。
  • jdbc增量匯入給從庫同步留下時間
#如果logstash基於從庫按自增id做增量同步,由於事務提交順序並不一定是按id從小打到的順序,id雖然是自增id,但id=100的資料可能先於id=96的資料入庫。
#如若此時logstash同步了id=100的資料(id=96的資料還未入庫),那sql_last_value就一定會>=100,下次再導的時候,只會取id>sql_last_value的資料,就會丟失了id=96這條資料。
#解決方式:給從庫同步留下時間,最大隻導1分鐘之前的資料(從庫同步延遲在毫秒級別),即等id=96也入庫後再導。
#實踐中,注意從庫的延遲。如果從庫延遲較大,也應適當增加等待間隔。
#statement改為:
statement => "SELECT id,product_id,DATE_FORMAT(event_time, '%Y-%m-%d %H:%i:%s') as event_time,uid FROM user_browse_0_logs WHERE id > :sql_last_value and event_time < date_sub(now(),interval 1 minute)"

相關推薦

資料收集Logstash

Logstash 之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。 Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。 Input產生事件,Filter修改事件,Output將事件傳

第三節 ElasticSearch資料匯入Logstash

一、簡介 Logstash 是一款強大的資料處理工具,它可以實現資料傳輸,格式處理,格式化輸出,還有強大的外掛功能,常用於日誌處理、或一些具有一定格式的資料匯入到ES的處理。 工作流程 Logstash 工作的三個階段:

分散式日誌收集Logstash 筆記(二)

今天是2015年11月06日,早上起床,北京天氣竟然下起了大雪,不錯,最近幾年已經很少見到雪了,想起小時候冬天的樣子,回憶的影子還是歷歷在目。  進入正題吧,上篇介紹了Logstash的基礎知識和入門demo,本篇介紹幾個比較常用的命令和案例  通過上篇介紹,我們大體知道了整個logstash處理日誌的流程

資料收集Filebeat

  Filebeat採用Go語言開發,也可用於日誌收集,相較於的Logstash,更輕量,資源佔用更少。一般部署在日誌收集的最前端。   本文基於Filebeat 6.3.2總結。 設計要點 主要元件   Filebeat主要由兩大元件組成:

資料收集Flume

Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zooke

資料收集DataX

DataX DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。 DataX原理 設計理念 為了解決異

資料收集binlog同步----Maxwell

簡介 Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等

1.大資料元件ELK過程安裝logstash-jdbc-input外掛

1.安裝logstash-jdbc-input外掛 安裝logstash的'jdbc連線檔案,首先需要安裝ruby,也是為了更好的使用ruby中的gem安裝外掛,下載地址如下: https://rubyinstaller.org/downloads/ (1)下面先寫一下ruby的安裝教程

ElasticsearchLogstash壓縮包安裝及同步mysql資料

Elasticsearch之Logstash壓縮包安裝及同步mysql資料 一:安裝logstash 一:安裝logstash 1.從官網下載.tar.gz壓縮包 下載地址為:https://www.elastic.co/do

Asp.netCore安裝centos7 資料收集

虛擬機器的安裝和centos的安裝看博友的文章:https://www.cnblogs.com/zhaopei/p/netcore.html   https://www.centos.org/ centos安裝netcore 步驟 https://dotnet.microsoft.com/l

Hadoop-No.15Flume基於事件的資料收集和處理

Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構

Logstash-資料收集、解析和轉存

本文作者:羅海鵬,叩丁狼高階講師。原創文章,轉載請註明出處。 前言 最近這幾年,大資料一直都是很熱門的話題,很多人會顧名思義的覺得,大資料就是你現在手上有大量的資料。從某個層面上來看,這確實是這樣的,但是所謂的大資料不僅僅只是有一堆的資料在這,更重要的是我們要如何解析

ELKlogstash系統日誌和nginx日誌收集-4

tps 字符串 方便 主設備號 add code shadow art index logstash常用參數  1 path   是必須的選項,每一個file配置,都至少有一個path   2 exclude   是不想監聽的文件,logstash會自動忽略該

日誌分析平臺ELK日誌收集logstash

  前文我們聊解了什麼是elk,elk中的elasticsearch叢集相關元件和叢集搭建以及es叢集常用介面的說明和使用,回顧請檢視考https://www.cnblogs.com/qiuhom-1874/p/13758006.html;今天我們來了解下ELK中的日誌收集器logstash;   logst

日誌分析平臺ELK日誌收集logstash常用外掛配置

  前文我們瞭解了logstash的工作流程以及基本的收集日誌相關配置,回顧請參考https://www.cnblogs.com/qiuhom-1874/p/13761906.html;今天我們來了解下logstash的常用input外掛和filter外掛的相關配置;   先說filter外掛吧,繼續上一篇部

JVM高級特性-三、垃圾收集判斷對象存活算法

地方法 size none ava 裏的 結束 靜態屬性 概述 span 一、概述   運行時數據區中,程序計數器、虛擬機棧、本地方法棧都是隨線程而生隨線程而滅的   因此,他們的內存分配和回收是確定的,在方法或線程結束時就回收。而Java堆和方   法區則是不確定的

WordPress資料收集,以後整理

tiny log word res utm nbsp dao 指定 html WordPress主題開發:實現分頁功能 http://www.cnblogs.com/tinyphp/p/6361901.html WordPress如何調取顯示指定文章 https://w

ELK LogStash

elk 日誌分析系統 logstatsh 一、簡介 Logstash 是開源的服務器端數據處理管道,能夠同時 從多個來源采集數據、轉換數據,然後將數據發送到您最喜歡的 “存儲庫” 中。(我們的存儲庫當然是 Elasticsearch。)可以用三個詞來概括Logstash:集中、轉換和

ELK 學習筆記 Logstash基本語法

技術 logs erl 定義 -s images cnblogs img ron Logstash基本語法: 處理輸入的input 處理過濾的filter 處理輸出的output 區域 數據類型 條件判斷 字段引用 區域: Logstash中,是用{}

ELK 學習筆記 Logstashfilter配置

表達式 under starting beginning name core dir date source Logstash之filter: json filter: input{ stdin{ } } filter{ jso