1.修改logstash的配置項,以及logstash的配置項解析日誌:
1.conf結尾的檔案即可,如下圖所示:
首先了解一下,logstash的過程,如下圖所示:
從配置檔案來表述過程如下所示:
然後進入bin目錄下,進行執行conf檔案載入進去bat檔案中去。
然後進入bin目錄下執行該檔案如下,並執行成功;
logstash的架構設計:
根據開啟檔案的quene可知:
如果有多個不同的輸入,由quene作為一個緩衝佇列來分發到不同的pipelines,pipelines根據quene來獲取input中的資料,
每個pipelines可以理解為一個執行緒,執行緒和執行緒之間是獨立的,
看一下pipelines的結構以及自定的一些檔案;
首先看一下pipelines的路徑:
檢視一下pipelines裡面的定義格式和路徑:
- pipeline.id: test 執行緒id
# pipeline.workers: 1 pipelines執行緒數以及filter_output的處理執行緒數,預設是CPU的核數
# pipeline.batch.size: 1 Batcher一次批量獲取的待處理文件數,預設是125
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
這裡是配置檔案的輸入,輸出格式
# - pipeline.id: another_test
另一個程序數
# queue.type: persisted
佇列的持久化操作;
(1)可以持久化到磁碟,如persisted,也可以持久化到記憶體memory。(記憶體排程過後即刪除)
# path.config: "D:\ELK\logstash\logstash-6.4.0\config\logstash.conf"
//logstash進入檔案的讀取路徑
#
# Available options:
#
# # name of the pipeline
# pipeline.id: mylogs
#
# # The configuration string to be used by this pipeline
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#
# # The path from where to read the configuration text
# path.config: "/etc/conf.d/logstash/myconfig.cfg"
#
# # How many worker threads execute the Filters+Outputs stage of the pipeline
# pipeline.workers: 1 (actually defaults to number of CPUs)
#
# # How many events to retrieve from inputs before sending to filters+workers
# pipeline.batch.size: 125
#
# # How long to wait in milliseconds while polling for the next event
# # before dispatching an undersized batch to filters+outputs
# pipeline.batch.delay: 50
#
# # How many workers should be used per output plugin instance
# pipeline.output.workers: 1
#
# # Internal queuing model, "memory" for legacy in-memory based queuing and
persisted" for disk-based acked queueing. Defaults is memory
queue.type: memory
#
# # If using queue.type: persisted, the page data files size. The queue data consists of
# # append-only data files separated into pages. Default is 64mb
queue.page_capacity: 64mb //佇列的容量,在logstash的內部,quene是用來做緩衝池的。
#
# # If using queue.type: persisted, the maximum number of unread events in the queue.
# # Default is 0 (unlimited)
queue.max_events: 0 //最大可處理的事件數目
#
# # If using queue.type: persisted, the total capacity of the queue in number of bytes.
# # Default is 1024mb or 1gb
queue.max_bytes: 1024mb //佇列最大記憶體數目是1024MB
#
# # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
queue.checkpoint.acks: 1024 //事件傳輸完成過後的驗證,也就是ack進行傳輸完成的驗證結果。
#
# # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
#
# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# # Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
#
# # Enable Dead Letter Queueing for this pipeline.
# dead_letter_queue.enable: false
#
# If using dead_letter_queue.enable: true, the maximum size of dead letter queue for this pipeline. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb
#
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
啟動輸入,輸出檔案配置,如下所示:
D:\ELK\logstash\logstash-6.4.0\bin>logstash -f ..\config\logstash.conf
啟動自己書寫的配置檔案
Sending Logstash logs to D:/ELK/logstash/logstash-6.4.0/logs which is now configured via log4j2.properties
進入log4j,進行記錄
[2018-11-13T14:12:08,471][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-11-13T14:12:08,929][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.0"}
runner進行調動程式入口,進行啟動。
中間還有一個過程也就是quene的過程進行一個多資料來源接入的緩衝過程。
[2018-11-13T14:12:10,707][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
//啟動pipelines.yml,也就是個配置檔案,去啟動每一個r任務,pipeline_id是每個任務的id
pipe.workers是執行緒數:也就是
[2018-11-13T14:12:11,101][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-11-13T14:12:11,110][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:13,304][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:13,324][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://localhost:9200"]}
The stdin plugin is now waiting for input:
[2018-11-13T14:12:13,730][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x7eef08f3 run>"}
[2018-11-13T14:12:13,794][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-11-13T14:12:14,159][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-11-13T14:12:18,321][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:20,347][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}[2018-11-13T14:12:23,350][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:25,358][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:28,360][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:30,388][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:33,392][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-13T14:12:35,408][WARN ][logstash.outputs.elasticsearch] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://localhost:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://localhost:9200/][Manticore::SocketException] Connection refused: connect"}
[2018-11-13T14:12:38,411][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
'