[logstash-input-http] 外掛使用詳解
阿新 • • 發佈:2018-12-31
外掛介紹
Http外掛是2.0版本才出現的新外掛,1.x是沒有這個外掛的。這個外掛可以幫助logstash接收其他主機或者本機發送的http報文。
外掛的原理很簡單,它自己啟動了一個ruby的伺服器,用於接收Http請求。然後會把host(IP地址)和header相關的資訊新增到event中。
下面就看看這個外掛如何使用吧!
基本配置
先看看預設的配置吧!
http {}
簡單到心碎啊!其實有很多引數都是預設的...
上面的配置其實相當於:
http{ host => "0.0.0.0" port => 8080 additional_codecs => {"application/json"=>"json"} codec => "plain" threads => 4 ssl => false }
引數詳解
最主要的幾個引數,也是Http中常見的屬性:
host
預設是0.0.0.0,即所有的地址都可以傳送到本機,從而接收Http資訊。
port
是http外掛中伺服器執行的埠號。只要傳送到“本機IP”:"該埠號"
的資料都可以被http外掛接收到。
additional_codecs
配置文字型別和codec的對映,如上面所示,預設配置了json文字對應使用json的codec。
codec
如果上面的對映集合中找不到文字型別對應的codec,那麼會預設按照這個屬性配置的codec解析。
ssl
是否開啟SSL。
threads
ruby外掛中伺服器的啟動執行緒,這裡預設是4個。
user、password、keystore、keystore_password
這些都與http的認證有關係了,就不多說了。如果想要使用,再去參考文件吧!
原始碼初探
閱讀外掛的原始碼是為了更好的理解外掛的使用,並且在出錯的時候知道哪裡出現了問題。Logstash的外掛往往都有固定的書寫格式,因此很容易看到外掛的核心程式碼。
在Input外掛中,主要包含兩個方法:
public def register # register方法相當於初始化的構造方法 end # def register # 主要的核心業務方法都在run中 def run(queue) Stud.interval(@interval) do # 建立事件 event = LogStash::Event.new("message" => @message, "host" => @host) # 裝飾event物件 decorate(event) # 放入佇列中 queue << event end # loop end # def run
下面看看http的外掛內容吧!
首先看看register都做了什麼吧!
def register
require "logstash/util/http_compressed_requests"
# 建立Puma伺服器
@server = ::Puma::Server.new(nil) # we'll set the rack handler later
# 下面的都是跟認證相關的....
if @user && @password then
token = Base64.strict_encode64("#{@user}:#{@password.value}")
@auth_token = "Basic #{token}"
end
if @ssl
if @keystore.nil? || @keystore_password.nil?
raise(LogStash::ConfigurationError, "Settings :keystore and :keystore_password are required because :ssl is enabled.")
end
ctx = Puma::MiniSSL::Context.new
ctx.keystore = @keystore
ctx.keystore_pass = @keystore_password.value
ctx.verify_mode = case @verify_mode
when 'peer'
Puma::MiniSSL::VERIFY_PEER
when 'force_peer'
Puma::MiniSSL::VERIFY_PEER | Puma::MiniSSL::VERIFY_FAIL_IF_NO_PEER_CERT
when 'none'
Puma::MiniSSL::VERIFY_NONE
end
@server.add_ssl_listener(@host, @port, ctx)
else
@server.add_tcp_listener(@host, @port)
end
# 設定執行緒的數量
@server.min_threads = 0
@server.max_threads = @threads
@codecs = Hash.new
# 建立文字型別對應的codecs對映
@additional_codecs.each do |content_type, codec|
@codecs[content_type] = LogStash::Plugin.lookup("codec", codec).new
end
end # def register
可以簡單的把上面的程式碼歸納為:
- 1 建立Puma伺服器,Puma是一款ruby的高效能伺服器。
- 2 進行使用者身份和密碼的驗證授權
- 3 設定基本的執行緒資訊
- 4 建立codecs對映
再看看run方法
def run(queue)
p = Proc.new do |req|
begin
remote_host = req['puma.socket'].peeraddr[3]
REJECTED_HEADERS.each {|k| req.delete(k) }
req = lowercase_keys(req)
body = req.delete("rack.input")
# 這裡使用相應的codec解析對應的Body資訊
@codecs.fetch(req["content_type"], @codec).decode(body.read) do |event|
# 這裡遍歷每個事件,然後新增host和headers資訊
event["host"] = remote_host
event["headers"] = req
decorate(event)
queue << event
end
# 如果正常處理,則返回ok
['200', @response_headers, ['ok']]
rescue => e
@logger.error("unable to process event #{req.inspect}. exception => #{e.inspect}")
['500', @response_headers, ['internal error']]
end
end
auth = Proc.new do |username, password|
username == @user && password == @password.value
end if (@user && @password)
@server.app = Rack::Builder.new do
use(Rack::Auth::Basic, &auth) if auth
use CompressedRequests
run(p)
end
@server.run.join
end
private
def lowercase_keys(hash)
new_hash = {}
hash.each_pair do |k,v|
new_hash[k.downcase] = v
end
new_hash
end
看了上面的程式碼,基本對http的原理有了一定了解吧!