logstash消費阿裏雲kafka消息
阿新 • • 發佈:2018-04-19
aliyun kafka elk logstash版本: 5.5.3 及以後
logstash消費阿裏雲kafka信息並返回到elasticsearch系統
logstash消費阿裏雲kafka信息並返回到elasticsearch系統
配置信息解析:
bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"] #kafka系統的連接地址 client_id => ‘tt‘ #客戶端上傳到es時,新增字段 group_id => "CID-LOG" #kafka分組的信息 auto_offset_reset => "latest" #從最新的偏移量開始消費 consumer_threads => 5 #decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中 topics => ["alikafka-cid-log"] #//數組類型,可配置多個topic type => "bhy" #//所有插件通用屬性,尤其在input裏面配置多個數據源時很有用 security_protocol => "SASL_SSL" #kafka連接阿裏雲的協議 sasl_mechanism => "ONS" #kafka阿裏雲的消費機制, logstash中默認的是 GSSAPI jaas_path => "/data/logstash/config/kafka_client_jaas.conf" # ONS登錄信息的路徑 ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘ #證書 ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘#信任證書 ssl_keystore_password => "xxxx" #證書密碼 ssl_truststore_password => "xxxx" #證書密碼
關鍵信息:
logstash使用ONS機制連接kafka時,需要需要用到一些額外的jar包,可以把開發所使用的jar包,都放到 /data/logstash/vendor/jruby/lib/ 下面。
我的配置模板:
input{ kafka { bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"] client_id => ‘tt‘ group_id => "CID-LOG" auto_offset_reset => "latest" #從最新的偏移量開始消費 consumer_threads => 5 #decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中 topics => ["alikafka-cid-log"] #//數組類型,可配置多個topic type => "bhy" #//所有插件通用屬性,尤其在input裏面配置多個數據源時很有用 security_protocol => "SASL_SSL" sasl_mechanism => "ONS" jaas_path => "/data/logstash/config/kafka_client_jaas.conf" ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘ ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘ ssl_keystore_password => "xx" ssl_truststore_password => "xx" } } output { elasticsearch { hosts => ["es-ip:9200"] user => ["xxxx"] password => ["xxxx"] index => ["services"] } stdout { codec=>plain } }
java參數優化路徑:
config/jvm.options
logstash消費阿裏雲kafka消息