1. 程式人生 > >SpringBoot Log4j2發訊息到Kafka以及動態新增KafkaAppender

SpringBoot Log4j2發訊息到Kafka以及動態新增KafkaAppender

方法一:log4j2.xml檔案配置 <Appenders> <Kafkaname ="KAFKA"topic="log4j-kafka" syncSend="false">     <!-- <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY" />  -->   <MarkerFiltermarker ="kafkaLog"onMatch ="ACCEPT"onMismatch= "DENY"/>     <PatternLayoutpattern
="%d{yyyy-MM-dd HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n"/>  
  <Property name ="bootstrap.servers" >0.0.0.0:9092</ Property>    <Property name ="retries" >3</ Property>    <Property name ="linger.ms" >1000</ Property>    <Property name ="buffer.memory" > 10485760</ Property
> 
</Kafka> </Appenders> <Loggers>    <!-- additivity屬性作用是在使用當前Logger配置的Appender列印日誌 -->    <Logger name ="com.hhaip" level="INFO" additivity ="false" >        <AppenderRefref ="CONSOLE"/>        <AppenderRefref ="INTERPHONE"/>        <AppenderRefref ="KAFKA"
/>
   </Logger>    <!-- Root表示所有Logger用Root中的Appender列印日誌  -->    <Root level ="INFO" >       <AppenderRefref ="CONSOLE"/>       <AppenderRefref ="INTERPHONE"/>       <AppenderRefref ="KAFKA"/>    </Root> </Loggers> 注意不要定義key.serializervalue.serializerbatch.size,log4j2裡已經幫我們設定好了 解釋:MarkerFilter是一種過濾器,marker過濾器的名稱,使用時只有加上這種過濾器的日誌才會輸出到kafka,如下訊息傳送 ThresholdFilter也是過濾器,用來指定過濾的級別,如:ERROR級別的才會傳送至kafka      PatternLayout是訊息的格式 syncSend指是否同步等待,設為false表示傳送訊息後立即返回,true則會等待kafka響應後返回(log4j2 2.8以上版本) 傳送訊息至kafka 在Java類中可以按如下方式傳送訊息至kafka: log.info(Markers.KAFKA,"kafka訊息{}", "log4j-" +content); kafka收到的訊息
Markers類的內容 import org.slf4j.Marker; import org.slf4j.MarkerFactory; publicclass Markers {     publicstatic final Marker DB = MarkerFactory.getMarker("dbLog");     publicstatic final Marker KAFKA = MarkerFactory.getMarker("kafkaLog"); } 方法二:動態新增(SpringBoot專案) import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.DefaultConfiguration; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.filter.MarkerFilter; import org.apache.logging.log4j.core.layout.PatternLayout; import org.springframework.stereotype.Component; @Component publicclass KafkaLogAppender {     @PostConstruct     publicvoid init(){        final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);        final Configuration config = ctx .getConfiguration();        final Logger interLogger = ctx .getLogger("com.hhaip"); //需要寫日誌到資料庫的包名         List<Property>list = new ArrayList<>();        list .add(Property.createProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,"0.0.0.0:9092" ));        list .add(Property.createProperty(ProducerConfig.ACKS_CONFIG , "1"));        list .add(Property.createProperty(ProducerConfig.RETRIES_CONFIG , "3"));        list .add(Property.createProperty(ProducerConfig.LINGER_MS_CONFIG , "10000"));        list .add(Property.createProperty(ProducerConfig.BUFFER_MEMORY_CONFIG ,"10485760"));        list .add(Property.createProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG ,"1000"));         Property[]props = list .toArray(new Property[list .size()]);        //配置Marker過濾器(標記過濾器和方法一的一樣)         MarkerFilterfilter = MarkerFilter.createFilter("kafkaLog", Filter.Result.ACCEPT , Filter.Result.DENY);         Configurationconfiguration = new DefaultConfiguration();         Layout<String>layout = PatternLayout.createLayout("%date %message",null , configuration , null , Charset.forName ("UTF-8" ),false , false , null,null);         Appenderappender = KafkaAppender.createAppender(layout,filter, "KAFKA" ,true , "log4j-kafka" , props , configuration );        config .addAppender(appender );        interLogger .addAppender(appender );        appender .start();        ctx .updateLoggers();        } } 方法二和方法一的使用是一樣的,也許要定義一個Markers類. 注意:0.0.0.0換成你自己的IP地址