1. 程式人生 > >1.修改logstash的配置項,以及logstash的配置項解析日誌:

1.修改logstash的配置項,以及logstash的配置項解析日誌:

1.conf結尾的檔案即可,如下圖所示:

首先了解一下,logstash的過程,如下圖所示:

從配置檔案來表述過程如下所示:

然後進入bin目錄下,進行執行conf檔案載入進去bat檔案中去。

然後進入bin目錄下執行該檔案如下,並執行成功;

 

logstash的架構設計:

根據開啟檔案的quene可知:

如果有多個不同的輸入,由quene作為一個緩衝佇列來分發到不同的pipelines,pipelines根據quene來獲取input中的資料,

每個pipelines可以理解為一個執行緒,執行緒和執行緒之間是獨立的,

看一下pipelines的結構以及自定的一些檔案;

首先看一下pipelines的路徑:

檢視一下pipelines裡面的定義格式和路徑:

 

 - pipeline.id: test  執行緒id
#   pipeline.workers: 1  pipelines執行緒數以及filter_output的處理執行緒數,預設是CPU的核數
#   pipeline.batch.size: 1  Batcher一次批量獲取的待處理文件數,預設是125

#   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"

這裡是配置檔案的輸入,輸出格式
# - pipeline.id: another_test

另一個程序數
#   queue.type: persisted

佇列的持久化操作;

(1)可以持久化到磁碟,如persisted,也可以持久化到記憶體memory。(記憶體排程過後即刪除)
#    path.config: "D:\ELK\logstash\logstash-6.4.0\config\logstash.conf"

//logstash進入檔案的讀取路徑
#
# Available options:
#
#   # name of the pipeline
#   pipeline.id: mylogs
#
#   # The configuration string to be used by this pipeline
#   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#
#   # The path from where to read the configuration text
#   path.config: "/etc/conf.d/logstash/myconfig.cfg"
#
#   # How many worker threads execute the Filters+Outputs stage of the pipeline
#   pipeline.workers: 1 (actually defaults to number of CPUs)
#
#   # How many events to retrieve from inputs before sending to filters+workers
#   pipeline.batch.size: 125
#
#   # How long to wait in milliseconds while polling for the next event
#   # before dispatching an undersized batch to filters+outputs
#   pipeline.batch.delay: 50
#
#   # How many workers should be used per output plugin instance
#   pipeline.output.workers: 1
#
#   # Internal queuing model, "memory" for legacy in-memory based queuing and
persisted" for disk-based acked queueing. Defaults is memory
 queue.type: memory
#
#   # If using queue.type: persisted, the page data files size. The queue data consists of
#   # append-only data files separated into pages. Default is 64mb
   queue.page_capacity: 64mb //佇列的容量,在logstash的內部,quene是用來做緩衝池的。
#
#   # If using queue.type: persisted, the maximum number of unread events in the queue.
#   # Default is 0 (unlimited)
   queue.max_events: 0 //最大可處理的事件數目
#
#   # If using queue.type: persisted, the total capacity of the queue in number of bytes.
#   # Default is 1024mb or 1gb
   queue.max_bytes: 1024mb //佇列最大記憶體數目是1024MB
#
#   # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
#   # Default is 1024, 0 for unlimited
    queue.checkpoint.acks: 1024  //事件傳輸完成過後的驗證,也就是ack進行傳輸完成的驗證結果。
#
#   # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
#   # Default is 1024, 0 for unlimited
#   queue.checkpoint.writes: 1024
#
#   # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
#   # Default is 1000, 0 for no periodic checkpoint.
#   queue.checkpoint.interval: 1000
#
#   # Enable Dead Letter Queueing for this pipeline.
#   dead_letter_queue.enable: false
#
#   If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries
#   will be dropped if they would increase the size of the dead letter queue beyond this setting.
#   Default is 1024mb
#   dead_letter_queue.max_bytes: 1024mb
#
#   If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
#   Default is path.data/dead_letter_queue
#
#   path.dead_letter_queue:

 

 

 

啟動輸入,輸出檔案配置,如下所示:

D:\ELK\logstash\logstash-6.4.0\bin>logstash -f ..\config\logstash.conf

啟動自己書寫的配置檔案
Sending Logstash logs to D:/ELK/logstash/logstash-6.4.0/logs which is now configured via log4j2.properties

進入log4j,進行記錄
[2018-11-13T14:12:08,471][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-11-13T14:12:08,929][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.4.0"}

runner進行調動程式入口,進行啟動。

中間還有一個過程也就是quene的過程進行一個多資料來源接入的緩衝過程。
[2018-11-13T14:12:10,707][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}

//啟動pipelines.yml,也就是個配置檔案,去啟動每一個r任務,pipeline_id是每個任務的id

pipe.workers是執行緒數:也就是


[2018-11-13T14:12:11,101][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-11-13T14:12:11,110][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:13,304][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:13,324][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
The stdin plugin is now waiting for input:
[2018-11-13T14:12:13,730][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x7eef08f3 run>"}
[2018-11-13T14:12:13,794][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-11-13T14:12:14,159][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-11-13T14:12:18,321][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:20,347][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}[2018-11-13T14:12:23,350][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:25,358][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:28,360][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:30,388][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:33,392][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:35,408][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:38,411][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}

 

 

 

 

'