1. 程式人生 > >logstash消費阿裏雲kafka消息

logstash消費阿裏雲kafka消息

aliyun kafka elk

logstash版本: 5.5.3 及以後
logstash消費阿裏雲kafka信息並返回到elasticsearch系統

配置信息解析:

        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]  #kafka系統的連接地址
        client_id => ‘tt‘   #客戶端上傳到es時,新增字段
        group_id => "CID-LOG"   #kafka分組的信息
        auto_offset_reset => "latest" #從最新的偏移量開始消費
        consumer_threads => 5
        #decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中
        topics => ["alikafka-cid-log"] #//數組類型,可配置多個topic
        type => "bhy" #//所有插件通用屬性,尤其在input裏面配置多個數據源時很有用
        security_protocol => "SASL_SSL"  #kafka連接阿裏雲的協議
        sasl_mechanism => "ONS"          #kafka阿裏雲的消費機制, logstash中默認的是 GSSAPI
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"  # ONS登錄信息的路徑
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘  #證書
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘#信任證書
        ssl_keystore_password => "xxxx"      #證書密碼
        ssl_truststore_password => "xxxx"    #證書密碼

關鍵信息:
logstash使用ONS機制連接kafka時,需要需要用到一些額外的jar包,可以把開發所使用的jar包,都放到 /data/logstash/vendor/jruby/lib/ 下面。

我的配置模板:

input{
      kafka {
        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]
        client_id => ‘tt‘
        group_id => "CID-LOG"
        auto_offset_reset => "latest" #從最新的偏移量開始消費
        consumer_threads => 5
        #decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中
        topics => ["alikafka-cid-log"] #//數組類型,可配置多個topic
        type => "bhy" #//所有插件通用屬性,尤其在input裏面配置多個數據源時很有用
        security_protocol => "SASL_SSL"
        sasl_mechanism => "ONS"
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_keystore_password => "xx"
        ssl_truststore_password => "xx"
      }
}

output {
   elasticsearch {
       hosts => ["es-ip:9200"]
       user => ["xxxx"]
       password => ["xxxx"]
       index => ["services"]
   }
   stdout {
       codec=>plain
   }
}

java參數優化路徑:

        config/jvm.options

logstash消費阿裏雲kafka消息