Logstash相關配置與啟動
阿新 • • 發佈:2020-12-23
-
首先進入到 Logstash/bin 資料夾下,我們的配置檔案是放在Logstash/bin/mysql/jdbc-timedtask.conf 檔案中。
- 所以我們的啟動命令是 ./logstash -f ./mysql/jdbc-timedtask.conf ,
- 如果是linux進行後臺啟動則需要執行 nohup ./logstash -f ./mysql/jdbc-timedtask.conf & ;
- 關閉Logstash 服務,則需要先去查詢 Logstash 的Pid。
- 命令是: ps -ef |grep logstash 獲取到執行的logstash 的Pid 。例如Pid為1218。通過使用kill -9 1218 進行關閉服務。
-
目錄結構:
- Logstash
- bin
- logstash
- logstash.bat
- .........
- mysql
- jdbc.conf
- mysql-connector-java-5.1.49.jar
- jdbc-user.txt
- jdbc-user.sql
- jdbc-department.txt
- jdbc-department.sql
- bin
- Logstash
-
配置檔案資訊 Logstash/bin/jdbc.conf:
input { jdbc { #索引的型別,識別符號。 type => "user" # jdbc 驅動包位置 jdbc_driver_library => "./mysql/mysql-connector-java-5.1.49.jar" # 要使用的驅動包類 jdbc_driver_class => "com.mysql.jdbc.Driver" #配置jdbc地址,serverTimezone=Asia/Shanghai 時區設定為上海。 jdbc_connection_string => "jdbc:mysql://localhost:3306/db?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai" # 資料庫使用者名稱 jdbc_user => "root" # 資料庫密碼 jdbc_password => "123456" #定時任務 分別代表(分 時 天 月 年)。按需配置。 schedule => "35 10 * * *" #logstash從mysql同步資料,如果資料量太大時可以進行分頁同步。 jdbc_paging_enabled => true #分頁大小 jdbc_page_size => 100000 #流式獲取資料,每次取10000 jdbc_fetch_size => 10000 #嘗試連線到資料庫的最大次數 connection_retry_attempts => 3 #連線嘗試之間休眠的秒數 connection_retry_attempts_wait_time => 1 #超時時間 jdbc_pool_timeout => 5 #是否強制欄位為小寫的形式 lowercase_column_names => true #-------下面的配置是設定增量同步資料的----- #啟用資料庫欄位記錄增量 use_column_value => true #資料庫同步時按照那個欄位為依據進行增量同步。我用的是操作時間欄位。 tracking_column => operatetime #遞增欄位的型別,numeric 表示數值型別, timestamp 表示時間戳型別 tracking_column_type => "timestamp" ##儲存上次執行記錄,增量提取資料時使用 record_last_run => true #記錄上次的同步點,需要事先建立好,重啟時會讀取這個檔案,這個檔案可以手動修改 last_run_metadata_path => "./mysql/jdbc-user.txt" #是否清除last_run_metadata_path的記錄,如果是true,則從頭開始查詢所有的資料庫記錄 clean_run => false #按照什麼語句進行增量同步資料。可以使用sql檔案,也可以使用sql語句,我這裡使用的是sql檔案。 statement_filepath => "./mysql/timetask-user.sql" # statement => "select * from user where operate_time >=:sql_last_value and operate_time <now()" #設定時區, jdbc_default_timezone => "Asia/Shanghai" } #多表進行增量同步資料,可以和上面的配置一致。 jdbc { type => "department" jdbc_driver_library => "./mysql/mysql-connector-java-5.1.49.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/db?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "123456" schedule => "35 10 * * *" jdbc_paging_enabled => true jdbc_page_size => 100000 jdbc_fetch_size => 10000 connection_retry_attempts => 3 connection_retry_attempts_wait_time => 1 jdbc_pool_timeout => 5 lowercase_column_names => true #codec => plain {charset => "UTF-8"} use_column_value => true tracking_column => id #這裡的是以Id作為增量欄位的。型別是bigint,這裡使用的是numeric。 tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "./mysql/jdbc-position.txt" clean_run => false statement_filepath => "./mysql/timetask-department.sql" jdbc_default_timezone => "Asia/Shanghai" } } filter { json { source => "message" } date{ #格式化時間型別 match => ["operatetime","yyy-MM-dd HH:mm:ss"] } if[type]=="company"{ #雖然使用了上海的時區,但是logstash同步資料的時候還是會差8個小時. ruby { code => "event.set('operatetime', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('operatetime'))" } mutate { remove_field => ["timestamp"] # rename => { "[host][name]" => "host" } } } } output { if[type] == "user"{ elasticsearch{ # es host : host,如果是叢集,就用“,”分開,例如["127.0.0.1:9200", "127.0.0.2:9200"] ,我這裡就一個節點。 hosts => ["http://127.0.0.1:9200"] # 索引 ,logstash 的索引名稱。 index => "user" # logstash 的主鍵。 document_id => "%{id}" document_type => "db" #elasticsearch 設定密碼後,logstash需要登入時的賬號密碼。 user => elastic password => 123456gw } } if[type] == "department"{ elasticsearch{ hosts => ["http://127.0.0.1:9200"] index => "department" document_id => "%{id}" document_type => "db" user => elastic password => 123456gw } } }
-
mysql-connector-java-5.1.49.jar
- 資料庫的驅動jar ,可以在網路上搜索下載。
-
jdbc-user.sql
select * from user where operate_time >=:sql_last_value and operate_time <now()
-
jdbc-department.sql
select * from department where id >=:sql_last_value
-
jdbc-user.txt 資料庫中操作時間最小的是2019年的,所以設定為2019年。
--- !ruby/object:DateTime '2019-01-01 00:00:01.000000000 Z'
- jdbc-department.txt 資料庫中主鍵ID最小的是0年,所以設定為0。
--- 0