資料收集之Logstash
Logstash
之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。
Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。
Input產生事件,Filter修改事件,Output將事件傳送到其他地方。
注:經常看到的編解碼器codec也是過濾器,可以作為輸入或輸出的一部分,對流進行編解碼。
簡單部署
#下載解壓
[[email protected] ~/software]$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2 .tar.gz
[[email protected] ~/software]$ tar -zxvf logstash-6.3.2.tar.gz
#自檢-接收標準輸入並輸出到控制檯
[[email protected] ~/software/logstash-6.3.2]$ bin/logstash -e 'input { stdin { } } output { stdout {} }'
輸入 Hello
輸出
{
"@version" => "1",
"message" => "Hello",
"@timestamp" => 2018-08-11T11:49:31.059Z,
"host" => "localhost.local"
}
Mysql增量同步至 Kafka/Hdfs/Elasticsearch
Mysql 測試庫表
生產中一個庫同一類資料一般會有多張分表如16張分表,256張分表。
#建庫 user_logs
create database user_logs;
#建表-使用者瀏覽日誌表: user_browse_0_logs,user_browse_1_logs 如
CREATE TABLE `user_browse_0_logs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`product_id` int(11) NOT NULL,
`event_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`uid` varchar(10) NOT NULL,
PRIMARY KEY (`id`),
KEY `event_time` (`event_time`) USING BTREE,
KEY `uid_event_time` (`uid`,`event_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='使用者瀏覽日誌表';
Logstash 配置
基礎配置
#配置目錄
[wangpei@localhost ~/software/logstash-6.3.2]$ mkdir db_test
#拷貝預設配置
[wangpei@localhost ~/software/logstash-6.3.2]$ cp config/* db_test/
#db_test/jvm.options
#jvm配置
-Xms1g
-Xmx1g
#db_test/logstash.yml
#持久化目錄
#logstash和plugin任何需要持久化的資料都會放在這個目錄
path.data: /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data
#queue
#預設memory,logstash程序重啟會丟資料
queue.type: persisted
#併發度
pipeline.workers: 2
#每個執行緒從輸入收集的最大數量
pipeline.batch.size: 125
Input-Filter-Output配置
[wangpei@localhost ~/software/logstash-6.3.2]$ vim db_test/user_browse_logs.conf
#輸入配置 jdbc外掛
input {
jdbc {
#jdbc驅動
jdbc_driver_library => "/Users/wangpei/software/logstash-6.3.2/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#jdbc連線
jdbc_connection_string => "jdbc:mysql://node2:3306/user_logs"
jdbc_user => "root"
jdbc_password => "111"
#任務排程
schedule => "* * * * *"
#上次執行的sql_last_value儲存路徑
#啟動前需要確保該檔案已存在
last_run_metadata_path => "/Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs"
#sql語句
#按主鍵自增id 增量同步
#input在拿到欄位為日期型別的資料時,一定會減8小時並帶上時區資訊
#如原始mysql event_time欄位為日期型別(這裡是timestamp型別)的資料2018-08-12 14:38:24 =>經過input後=> 2018-08-12T14:38:24.000Z
#不想讓input修改原始資料,這裡直接把event_time轉成成字串型別。
statement => "SELECT id,product_id,DATE_FORMAT(event_time, '%Y-%m-%d %H:%i:%s') as event_time,uid FROM user_browse_0_logs WHERE id > :sql_last_value"
#設定成true 用tracking_column列的值去更新上次記錄的sql_last_value值
use_column_value => true
#追蹤的列
tracking_column => "id"
#追蹤的列的資料型別 支援["numeric", "timestamp"]
tracking_column_type => "numeric"
#外掛id 便於監控
id => "user_browse_logs_input_jdbc"
#新增欄位 表名 便於後期DQC 如mysql 每個分表的資料和HDFS每個分割槽資料count、sum驗證
add_field => {"table_name" => "user_browse_0_logs"}
#input event輸出格式 每個event轉換成json+換行符格式
codec => json_lines {charset => "UTF-8"}
}
}
#Filter配置
filter {
#預設會對每條資料加上@timestamp
#不指定@timestamp欄位來源,預設會以logstash伺服器收到資料時間為準
date {
#匹配到這種格式就會替換成目標欄位,目標欄位預設是@timestamp
#貌似不管怎麼設定,Logstash中 @timestamp永遠是0時區
#@timestamp 0時區,資料寫入ES,@timestamp 在Kibana上顯示時會根據瀏覽器市區自動調整 如自動+8
match => ["event_time" , "yyyy-MM-dd HH:mm:ss"]
#timezone => "Asia/Shanghai"
#target => "@timestamp"
}
#增加dt欄位,該欄位來源於event_time,用於hdfs按天分割槽、ES按天索引
#總結:
#假設源資料是北京時間
#1)資料同步到ES:(需要兼顧kibana各種作圖、Java/Python API查詢)
# A、按北京時間建索引如下dt
# B、kibana視覺化,時間選擇用@timestamp時間欄位,避免用event_time時間欄位做不了Date Histogram問題。
# C、ES API查詢,時間選擇用event_time時間欄位更方便。
#2)資料同步到HDFS: 按北京時間分割槽 如下dt即可
ruby {code => "event.set('dt', event.get('event_time')[0,10].gsub('-',''));"}
}
#輸出配置
output {
#輸出到控制檯
stdout { codec => rubydebug}
#輸出到kafka
kafka {
#kafka brokers
bootstrap_servers => "localhost:9092"
#topic
topic_id => "testTopic3"
#ack
acks => "1"
#輸出格式
codec => json
#id
id => "user_browse_logs_ouput_kafka"
}
#輸出到HDFS
#需要開啟HDFS 的webhdfs
webhdfs {
codec => json_lines {charset => "UTF-8"}
#namenode
host => 'node1'
port => 50070
#standby namenode
standby_host => 'node4'
standby_port => 50070
#user
user => 'root'
#user需要對path有讀寫許可權
path => '/data/user_logs/user_browse_logs/%{dt}/%{table_name}.log'
#攢到多少條event時向hdfs寫一次資料
flush_size => 3
#間隔多久向hdfs寫一次資料 單位:秒
idle_flush_time => 2
}
#輸出到elasticsearch
elasticsearch {
#es節點列表
hosts => ["localhost:9200"]
#索引使用的模板
#模板中定義了mapping(日期Format,某個欄位使用的分詞器)和setting
template_name => "template.user_browse_logs"
#index
#資料是北京時間,這個按北京時間建索引
index => "user_browse_logs.%{dt}"
#type
document_type =>"browse_logs"
#許可權
user => "logstash_input"
password => "logstash_input"
codec => json {charset => "UTF-8"}
}
}
啟動方式
#直接啟動===> 一個input-filter-output配置獨享一個logstash程序
[[email protected] ~/software/logstash-6.3.2]$ bin/logstash --path.settings db_test/ -f db_test/user_browse_logs.conf --config.reload.automatic
#pipeline方式啟動===> 多個input-filter-output配置共享一個logstash程序
[[email protected] ~/software/logstash-6.3.2]$ bin/logstash --path.settings db_test/ --config.reload.automatic
需配置db_test/pipelines.yml
配置多個pipeline。每個pipeline可對應一個表(input-filter-output)。
- pipeline.id: user_logs.user_browse_logs
queue.type: persisted
pipeline.workers: 1
queue.max_bytes: 32mb
path.config: "/Users/wangpei/software/logstash-6.3.2/db_test/user_browse_logs.conf"
- pipeline.id: user_logs.user_pay_logs
queue.type: persisted
pipeline.workers: 1
queue.max_bytes: 32mb
path.config: "/Users/wangpei/software/logstash-6.3.2/db_test/user_pay_logs.conf"
...
結果
mysql插入資料
insert into user_browse_0_logs(product_id,uid) values(1,"0");
insert into user_browse_0_logs(product_id,uid) values(2,"0");
insert into user_browse_0_logs(product_id,uid) values(3,"0");
mysql> select * from user_browse_0_logs;
+----+------------+---------------------+-----+
| id | product_id | event_time | uid |
+----+------------+---------------------+-----+
| 1 | 1 | 2018-08-12 20:18:31 | 0 |
| 2 | 2 | 2018-08-12 20:18:31 | 0 |
| 3 | 3 | 2018-08-12 20:18:32 | 0 |
+----+------------+---------------------+-----+
kafka收到的資料
[wangpei@localhost ~/software/logstash-6.2.4]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic3
{"@version":"1","dt":"20180812","product_id":3,"uid":"0","id":3,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:32","@timestamp":"2018-08-12T12:18:32.000Z"}
{"@version":"1","dt":"20180812","product_id":1,"uid":"0","id":1,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":2,"uid":"0","id":2,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
hdfs收到的資料
#分割槽:20180812
#分割槽下的檔案:user_browse_0_logs.log user_browse_0_logs是表名
#該路徑來自:output webhdfs path引數 path => '/data/user_logs/user_browse_logs/%{dt}/%{table_name}.log'
[root@node2 /root]# hdfs dfs -cat /data/user_logs/user_browse_logs/20180812/user_browse_0_logs.log
{"@version":"1","dt":"20180812","product_id":1,"uid":"0","id":1,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":2,"uid":"0","id":2,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:31","@timestamp":"2018-08-12T12:18:31.000Z"}
{"@version":"1","dt":"20180812","product_id":3,"uid":"0","id":3,"table_name":"user_browse_0_logs","event_time":"2018-08-12 20:18:32","@timestamp":"2018-08-12T12:18:32.000Z"}
es收到的資料
檢視jdbc偏移量記錄檔案
#該路徑來自input jdbc外掛 last_run_metadata_path引數
last_run_metadata_path => "/Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs"
#此時id的偏移量是3,下次取id>3的資料。這樣就實現了增量同步
[wangpei@localhost ~/software/logstash-6.3.2]$ cat /Users/wangpei/software/logstash-6.3.2/data/test/metadata/user_browse_0_logs
--- 3
檢視logstash持久化的資料
#持久化目錄下包含2個目錄,一個檔案
#該目錄來自於logstash.yml中 path.data: /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data
[wangpei@localhost ~/software/logstash-6.3.2]$ ls -l /Users/wangpei/software/logstash-6.3.2/data/user_browse_logs/data
total 8
drwxr-xr-x 2 wangpei staff 68 8 12 19:38 dead_letter_queue
drwxr-xr-x 4 wangpei staff 136 8 12 19:57 queue
-rw-r--r-- 1 wangpei staff 36 8 12 19:38 uuid
#uuid:logstash agent uuid
#queue:input → queue → filter + output。queue配置為persisted時,input會把資料先寫入此持久化目錄,然後再由filter + output處理。
#dead_letter_queue:DLQ佇列。死信佇列儲存響應程式碼為400或404的情況,兩者都表示無法重試的事件。
如資料input=>filter=>output(es),
一條資料出現mapping error,此時如果在logstash.yml中配置了dead_letter_queue.enable: true,
就會將這條資料放在DLQ對應的持久化目錄中。然後可以通過DLQ(Dead Letter Queues) Input => Filter(再處理)=>Output ES。
附:Kafka Input
input{
kafka{
codec => json {charset => "UTF-8"}
#消費者組
group_id => "test_topic_consumer1"
#從最新開始消費
auto_offset_reset => "latest"
#消費的topics
topics => ["test_topic"]
#bootstrap server
bootstrap_servers => "localhost:9092"
#注意:執行緒數最好與partition數目一樣多,超過partition數量的執行緒會閒置。
#kafka topic 一個partition同一時刻只能有一個執行緒去消費。
consumer_threads =>3
}
}
Logstash監控
可通過X-Pack以UI的方式檢視監控資訊,也可通過監控API檢視。
監控API主要有四類:節點資訊、外掛資訊、節點狀態統計、熱執行緒統計。
- 節點資訊
介面返回os、jvm、pipelines配置資訊 如jvm最大最小堆、pipelines worker數。
http://localhost:9600/_node/<os/pipelines/jvm>?pretty
- 外掛資訊
列出所有可用外掛。
http://localhost:9600/_node/plugins?pretty
- 節點狀態統計
http://localhost:9600/_node/stats/<type>?pretty
type=jvm 獲取jvm統計資訊。如jvm當前執行緒數、執行緒峰值、記憶體各區(young、old、survivor)記憶體使用情況、gc次數、gc時間
type=process 獲取程序統計資訊。如記憶體消耗和CPU使用情況
type=events 獲取event相關統計資訊。如輸入、filter、輸出中的event數目
type=pipelines 獲取每個pipeline統計資訊、
- 熱執行緒統計
獲取當前熱執行緒。熱執行緒是一個Java執行緒,具有較高的CPU使用率並且執行時間超過正常時間。
http://localhost:9600/_node/hot_threads?pretty
注意
- 一個input多個output
如一個jdbc,多個output(hdfs、kafka、es),kafka和es連線不上,input會繼續輸入,偏移量如自增id會繼續增加,配置了queue.type: persisted,output plugin kafka/es正常後,會重發歷史資料到kafka/es。原因:input在確認資料發往queue後就返回了。
- jdbc全量匯入
去掉jdbc sql語句中where條件即可。
- jdbc增量匯入給從庫同步留下時間
#如果logstash基於從庫按自增id做增量同步,由於事務提交順序並不一定是按id從小打到的順序,id雖然是自增id,但id=100的資料可能先於id=96的資料入庫。
#如若此時logstash同步了id=100的資料(id=96的資料還未入庫),那sql_last_value就一定會>=100,下次再導的時候,只會取id>sql_last_value的資料,就會丟失了id=96這條資料。
#解決方式:給從庫同步留下時間,最大隻導1分鐘之前的資料(從庫同步延遲在毫秒級別),即等id=96也入庫後再導。
#實踐中,注意從庫的延遲。如果從庫延遲較大,也應適當增加等待間隔。
#statement改為:
statement => "SELECT id,product_id,DATE_FORMAT(event_time, '%Y-%m-%d %H:%i:%s') as event_time,uid FROM user_browse_0_logs WHERE id > :sql_last_value and event_time < date_sub(now(),interval 1 minute)"
相關推薦
資料收集之Logstash
Logstash 之前用的Logstash快忘了,好記性不如爛筆頭,好好總結一下。 Logstash由Java(Core)+Ruby(Plugin)語言編寫,是一個開源的日誌收集、處理、轉發工具。 Input產生事件,Filter修改事件,Output將事件傳
第三節 ElasticSearch資料匯入之Logstash
一、簡介 Logstash 是一款強大的資料處理工具,它可以實現資料傳輸,格式處理,格式化輸出,還有強大的外掛功能,常用於日誌處理、或一些具有一定格式的資料匯入到ES的處理。 工作流程 Logstash 工作的三個階段:
分散式日誌收集之Logstash 筆記(二)
今天是2015年11月06日,早上起床,北京天氣竟然下起了大雪,不錯,最近幾年已經很少見到雪了,想起小時候冬天的樣子,回憶的影子還是歷歷在目。 進入正題吧,上篇介紹了Logstash的基礎知識和入門demo,本篇介紹幾個比較常用的命令和案例 通過上篇介紹,我們大體知道了整個logstash處理日誌的流程
資料收集之Filebeat
Filebeat採用Go語言開發,也可用於日誌收集,相較於的Logstash,更輕量,資源佔用更少。一般部署在日誌收集的最前端。 本文基於Filebeat 6.3.2總結。 設計要點 主要元件 Filebeat主要由兩大元件組成:
資料收集之Flume
Flume最初由Cloudera開發,於2011年6月貢獻給Apache,於2012成為頂級專案。在孵化這一年,基於老版本的Flume(Flume OG:Flume Original Generation 即Flume 0.9.x版本)進行重構,摒棄了Zooke
資料收集之DataX
DataX DataX是阿里開源的離線資料同步工具,可以實現包括 MySQL、Oracle、MongoDB、Hive、HDFS、HBase、Elasticsearch等各種異構資料來源之間的高效同步。 DataX原理 設計理念 為了解決異
資料收集之binlog同步----Maxwell
簡介 Maxwell是由Java語言編寫,Zendesk開源的binlog解析同步工具。可通過簡單配置,將binlog解析並以json的格式同步到如file,kafka,redis,RabbitMQ等
1.大資料元件之ELK過程之安裝logstash-jdbc-input外掛
1.安裝logstash-jdbc-input外掛 安裝logstash的'jdbc連線檔案,首先需要安裝ruby,也是為了更好的使用ruby中的gem安裝外掛,下載地址如下: https://rubyinstaller.org/downloads/ (1)下面先寫一下ruby的安裝教程
Elasticsearch之Logstash壓縮包安裝及同步mysql資料
Elasticsearch之Logstash壓縮包安裝及同步mysql資料 一:安裝logstash 一:安裝logstash 1.從官網下載.tar.gz壓縮包 下載地址為:https://www.elastic.co/do
Asp.netCore之安裝centos7 資料收集
虛擬機器的安裝和centos的安裝看博友的文章:https://www.cnblogs.com/zhaopei/p/netcore.html https://www.centos.org/ centos安裝netcore 步驟 https://dotnet.microsoft.com/l
Hadoop-No.15之Flume基於事件的資料收集和處理
Flume是一種分散式的可靠開源系統,用於流資料的高效收集,聚集和移動.Flume通常用於移動日誌資料.但是也能移動大量事件資料.如社交媒體訂閱,訊息佇列事件或者網路流量資料. Flume架構
Logstash-資料收集、解析和轉存
本文作者:羅海鵬,叩丁狼高階講師。原創文章,轉載請註明出處。 前言 最近這幾年,大資料一直都是很熱門的話題,很多人會顧名思義的覺得,大資料就是你現在手上有大量的資料。從某個層面上來看,這確實是這樣的,但是所謂的大資料不僅僅只是有一堆的資料在這,更重要的是我們要如何解析
ELK之logstash系統日誌和nginx日誌收集-4
tps 字符串 方便 主設備號 add code shadow art index logstash常用參數 1 path 是必須的選項,每一個file配置,都至少有一個path 2 exclude 是不想監聽的文件,logstash會自動忽略該
日誌分析平臺ELK之日誌收集器logstash
前文我們聊解了什麼是elk,elk中的elasticsearch叢集相關元件和叢集搭建以及es叢集常用介面的說明和使用,回顧請檢視考https://www.cnblogs.com/qiuhom-1874/p/13758006.html;今天我們來了解下ELK中的日誌收集器logstash; logst
日誌分析平臺ELK之日誌收集器logstash常用外掛配置
前文我們瞭解了logstash的工作流程以及基本的收集日誌相關配置,回顧請參考https://www.cnblogs.com/qiuhom-1874/p/13761906.html;今天我們來了解下logstash的常用input外掛和filter外掛的相關配置; 先說filter外掛吧,繼續上一篇部
JVM高級特性-三、垃圾收集之判斷對象存活算法
地方法 size none ava 裏的 結束 靜態屬性 概述 span 一、概述 運行時數據區中,程序計數器、虛擬機棧、本地方法棧都是隨線程而生隨線程而滅的 因此,他們的內存分配和回收是確定的,在方法或線程結束時就回收。而Java堆和方 法區則是不確定的
WordPress資料收集,以後整理
tiny log word res utm nbsp dao 指定 html WordPress主題開發:實現分頁功能 http://www.cnblogs.com/tinyphp/p/6361901.html WordPress如何調取顯示指定文章 https://w
ELK 之 LogStash
elk 日誌分析系統 logstatsh 一、簡介 Logstash 是開源的服務器端數據處理管道,能夠同時 從多個來源采集數據、轉換數據,然後將數據發送到您最喜歡的 “存儲庫” 中。(我們的存儲庫當然是 Elasticsearch。)可以用三個詞來概括Logstash:集中、轉換和
ELK 學習筆記之 Logstash基本語法
技術 logs erl 定義 -s images cnblogs img ron Logstash基本語法: 處理輸入的input 處理過濾的filter 處理輸出的output 區域 數據類型 條件判斷 字段引用 區域: Logstash中,是用{}
ELK 學習筆記之 Logstash之filter配置
表達式 under starting beginning name core dir date source Logstash之filter: json filter: input{ stdin{ } } filter{ jso