1. 程式人生 > 實用技巧 >Logstash相關配置與啟動

Logstash相關配置與啟動

  1. 首先進入到 Logstash/bin 資料夾下,我們的配置檔案是放在Logstash/bin/mysql/jdbc-timedtask.conf 檔案中。

    1. 所以我們的啟動命令是 ./logstash -f ./mysql/jdbc-timedtask.conf ,
    2. 如果是linux進行後臺啟動則需要執行 nohup ./logstash -f ./mysql/jdbc-timedtask.conf & ;
    3. 關閉Logstash 服務,則需要先去查詢 Logstash 的Pid。
      1. 命令是: ps -ef |grep logstash 獲取到執行的logstash 的Pid 。例如Pid為1218。通過使用kill -9 1218 進行關閉服務。
  2. 目錄結構:

    • Logstash
      • bin
        • logstash
        • logstash.bat
        • .........
        • mysql
          • jdbc.conf
          • mysql-connector-java-5.1.49.jar
          • jdbc-user.txt
          • jdbc-user.sql
          • jdbc-department.txt
          • jdbc-department.sql
  3. 配置檔案資訊 Logstash/bin/jdbc.conf:

    input {    
    	jdbc {
    		#索引的型別,識別符號。
    		type => "user"
    		# jdbc 驅動包位置
    		jdbc_driver_library => "./mysql/mysql-connector-java-5.1.49.jar"      
    		# 要使用的驅動包類
    		jdbc_driver_class => "com.mysql.jdbc.Driver"   
    		#配置jdbc地址,serverTimezone=Asia/Shanghai 時區設定為上海。
            jdbc_connection_string => "jdbc:mysql://localhost:3306/db?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"                
            # 資料庫使用者名稱
    		jdbc_user => "root"   
    		# 資料庫密碼
    		jdbc_password => "123456"        
    	    #定時任務 分別代表(分 時 天 月 年)。按需配置。
    	    schedule => "35 10 * * *"     
            #logstash從mysql同步資料,如果資料量太大時可以進行分頁同步。
            jdbc_paging_enabled => true
            #分頁大小
            jdbc_page_size => 100000
            #流式獲取資料,每次取10000
            jdbc_fetch_size => 10000
            #嘗試連線到資料庫的最大次數
            connection_retry_attempts => 3
            #連線嘗試之間休眠的秒數
            connection_retry_attempts_wait_time => 1
            #超時時間
            jdbc_pool_timeout => 5
            #是否強制欄位為小寫的形式
            lowercase_column_names => true
            #-------下面的配置是設定增量同步資料的-----
            #啟用資料庫欄位記錄增量
            use_column_value => true    
            #資料庫同步時按照那個欄位為依據進行增量同步。我用的是操作時間欄位。
            tracking_column => operatetime      
            #遞增欄位的型別,numeric 表示數值型別, timestamp 表示時間戳型別
            tracking_column_type => "timestamp"  
            ##儲存上次執行記錄,增量提取資料時使用
            record_last_run => true  
            #記錄上次的同步點,需要事先建立好,重啟時會讀取這個檔案,這個檔案可以手動修改
          	last_run_metadata_path => "./mysql/jdbc-user.txt"  
            #是否清除last_run_metadata_path的記錄,如果是true,則從頭開始查詢所有的資料庫記錄
            clean_run => false   
            #按照什麼語句進行增量同步資料。可以使用sql檔案,也可以使用sql語句,我這裡使用的是sql檔案。
            statement_filepath => "./mysql/timetask-user.sql"	
            # statement => "select * from user where operate_time >=:sql_last_value and operate_time <now()"     
            #設定時區,
          	jdbc_default_timezone => "Asia/Shanghai"    
    		}
    		#多表進行增量同步資料,可以和上面的配置一致。
            jdbc {     
    		type => "department"
    		jdbc_driver_library => "./mysql/mysql-connector-java-5.1.49.jar"  
    		jdbc_driver_class => "com.mysql.jdbc.Driver"   
            jdbc_connection_string =>  "jdbc:mysql://localhost:3306/db?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"  
    		jdbc_user => "root" 
    		jdbc_password => "123456"  
    	    schedule => "35 10 * * *"  
            jdbc_paging_enabled => true
            jdbc_page_size => 100000
            jdbc_fetch_size => 10000
            connection_retry_attempts => 3
            connection_retry_attempts_wait_time => 1
            jdbc_pool_timeout => 5
            lowercase_column_names => true
            #codec => plain {charset => "UTF-8"}
            use_column_value => true    
            tracking_column => id     
            #這裡的是以Id作為增量欄位的。型別是bigint,這裡使用的是numeric。
            tracking_column_type => "numeric"  
            record_last_run => true  
             last_run_metadata_path => "./mysql/jdbc-position.txt" 
            clean_run => false   
            statement_filepath => "./mysql/timetask-department.sql"	
          	jdbc_default_timezone => "Asia/Shanghai"    
    		}
    	}
    filter { 
        json {          
            source => "message" 
        } 
         date{
         		#格式化時間型別
                match => ["operatetime","yyy-MM-dd HH:mm:ss"]
            }
        if[type]=="company"{
        	#雖然使用了上海的時區,但是logstash同步資料的時候還是會差8個小時.
            ruby {   
                code => "event.set('operatetime', event.get('@timestamp').time.localtime + 8*60*60)"   
            }  
            ruby {  
                code => "event.set('@timestamp',event.get('operatetime'))"  
            }  
            mutate {  
                remove_field => ["timestamp"]  
            #  rename => { "[host][name]" => "host" }
            } 
        }   
    }
    output {    
    	if[type] == "user"{
    		elasticsearch{        
    		# es host : host,如果是叢集,就用“,”分開,例如["127.0.0.1:9200", "127.0.0.2:9200"] ,我這裡就一個節點。
            hosts => ["http://127.0.0.1:9200"]		
    		# 索引 ,logstash 的索引名稱。
    		index => "user"        
    		# logstash 的主鍵。
    		document_id => "%{id}"   
            document_type => "db" 
            #elasticsearch 設定密碼後,logstash需要登入時的賬號密碼。
            user => elastic
            password => 123456gw
    		}
    	}
    	if[type] == "department"{
    		elasticsearch{      
            hosts => ["http://127.0.0.1:9200"]		      
    		index => "department"        
    		document_id => "%{id}"    
    		document_type => "db"
    		user => elastic
            password => 123456gw
    		}
    	}
    }
    
  4. mysql-connector-java-5.1.49.jar

    1. 資料庫的驅動jar ,可以在網路上搜索下載。
  5. jdbc-user.sql

    select * from user where operate_time >=:sql_last_value and operate_time <now()
    
  6. jdbc-department.sql

    select * from department where id >=:sql_last_value
    
  7. jdbc-user.txt 資料庫中操作時間最小的是2019年的,所以設定為2019年。

--- !ruby/object:DateTime '2019-01-01 00:00:01.000000000 Z'
  1. jdbc-department.txt 資料庫中主鍵ID最小的是0年,所以設定為0。
--- 0