SpringBoot Log4j2發訊息到Kafka以及動態新增KafkaAppender
阿新 • • 發佈:2019-02-05
方法一:log4j2.xml檔案配置
<Appenders>
<Kafkaname
="KAFKA"topic="log4j-kafka" syncSend="false">
<!-- <ThresholdFilter level="error" onMatch="ACCEPT" onMismatch="DENY" /> -->
<MarkerFiltermarker ="kafkaLog"onMatch
="ACCEPT"onMismatch=
"DENY"/>
<PatternLayoutpattern ="%d{yyyy-MM-dd
HH:mm:ss,SSS}:%4p %t (%F:%L) - %m%n"/>
<Property
name ="bootstrap.servers"
>0.0.0.0:9092</
Property>
<Property
name ="retries"
>3</
Property>
<Property
name ="linger.ms"
>1000</
Property>
<Property
name ="buffer.memory"
> 10485760</
Property >
</Kafka>
</Appenders>
<Loggers>
<!-- additivity屬性作用是在使用當前Logger配置的Appender列印日誌 -->
<Logger
name ="com.hhaip"
level="INFO"
additivity ="false"
>
<AppenderRefref ="CONSOLE"/>
<AppenderRefref ="INTERPHONE"/>
<AppenderRefref ="KAFKA" />
</Logger>
<!-- Root表示所有Logger用Root中的Appender列印日誌 -->
<Root
level ="INFO"
>
<AppenderRefref ="CONSOLE"/>
<AppenderRefref ="INTERPHONE"/>
<AppenderRefref ="KAFKA"/>
</Root>
</Loggers>
注意不要定義key.serializer 、value.serializer和batch.size,log4j2裡已經幫我們設定好了
解釋:MarkerFilter是一種過濾器,marker過濾器的名稱,使用時只有加上這種過濾器的日誌才會輸出到kafka,如下訊息傳送
ThresholdFilter也是過濾器,用來指定過濾的級別,如:ERROR級別的才會傳送至kafka
PatternLayout是訊息的格式
syncSend指是否同步等待,設為false表示傳送訊息後立即返回,true則會等待kafka響應後返回(log4j2
2.8以上版本)
傳送訊息至kafka
在Java類中可以按如下方式傳送訊息至kafka:
log.info(Markers.KAFKA,"kafka訊息{}",
"log4j-"
+content);
kafka收到的訊息
Markers類的內容 import org.slf4j.Marker; import org.slf4j.MarkerFactory; publicclass Markers { publicstatic final Marker DB = MarkerFactory.getMarker("dbLog"); publicstatic final Marker KAFKA = MarkerFactory.getMarker("kafkaLog"); } 方法二:動態新增(SpringBoot專案) import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.DefaultConfiguration; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.filter.MarkerFilter; import org.apache.logging.log4j.core.layout.PatternLayout; import org.springframework.stereotype.Component; @Component publicclass KafkaLogAppender { @PostConstruct publicvoid init(){ final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); final Configuration config = ctx .getConfiguration(); final Logger interLogger = ctx .getLogger("com.hhaip"); //需要寫日誌到資料庫的包名 List<Property>list = new ArrayList<>(); list .add(Property.createProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,"0.0.0.0:9092" )); list .add(Property.createProperty(ProducerConfig.ACKS_CONFIG , "1")); list .add(Property.createProperty(ProducerConfig.RETRIES_CONFIG , "3")); list .add(Property.createProperty(ProducerConfig.LINGER_MS_CONFIG , "10000")); list .add(Property.createProperty(ProducerConfig.BUFFER_MEMORY_CONFIG ,"10485760")); list .add(Property.createProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG ,"1000")); Property[]props = list .toArray(new Property[list .size()]); //配置Marker過濾器(標記過濾器和方法一的一樣) MarkerFilterfilter = MarkerFilter.createFilter("kafkaLog", Filter.Result.ACCEPT , Filter.Result.DENY); Configurationconfiguration = new DefaultConfiguration(); Layout<String>layout = PatternLayout.createLayout("%date %message",null , configuration , null , Charset.forName ("UTF-8" ),false , false , null,null); Appenderappender = KafkaAppender.createAppender(layout,filter, "KAFKA" ,true , "log4j-kafka" , props , configuration ); config .addAppender(appender ); interLogger .addAppender(appender ); appender .start(); ctx .updateLoggers(); } } 方法二和方法一的使用是一樣的,也許要定義一個Markers類. 注意:0.0.0.0換成你自己的IP地址
Markers類的內容 import org.slf4j.Marker; import org.slf4j.MarkerFactory; publicclass Markers { publicstatic final Marker DB = MarkerFactory.getMarker("dbLog"); publicstatic final Marker KAFKA = MarkerFactory.getMarker("kafkaLog"); } 方法二:動態新增(SpringBoot專案) import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import javax.annotation.PostConstruct; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Filter; import org.apache.logging.log4j.core.Layout; import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.DefaultConfiguration; import org.apache.logging.log4j.core.config.Property; import org.apache.logging.log4j.core.filter.MarkerFilter; import org.apache.logging.log4j.core.layout.PatternLayout; import org.springframework.stereotype.Component; @Component publicclass KafkaLogAppender { @PostConstruct publicvoid init(){ final LoggerContext ctx = (LoggerContext) LogManager.getContext(false); final Configuration config = ctx .getConfiguration(); final Logger interLogger = ctx .getLogger("com.hhaip"); //需要寫日誌到資料庫的包名 List<Property>list = new ArrayList<>(); list .add(Property.createProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG ,"0.0.0.0:9092" )); list .add(Property.createProperty(ProducerConfig.ACKS_CONFIG , "1")); list .add(Property.createProperty(ProducerConfig.RETRIES_CONFIG , "3")); list .add(Property.createProperty(ProducerConfig.LINGER_MS_CONFIG , "10000")); list .add(Property.createProperty(ProducerConfig.BUFFER_MEMORY_CONFIG ,"10485760")); list .add(Property.createProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG ,"1000")); Property[]props = list .toArray(new Property[list .size()]); //配置Marker過濾器(標記過濾器和方法一的一樣) MarkerFilterfilter = MarkerFilter.createFilter("kafkaLog", Filter.Result.ACCEPT , Filter.Result.DENY); Configurationconfiguration = new DefaultConfiguration(); Layout<String>layout = PatternLayout.createLayout("%date %message",null , configuration , null , Charset.forName ("UTF-8" ),false , false , null,null); Appenderappender = KafkaAppender.createAppender(layout,filter, "KAFKA" ,true , "log4j-kafka" , props , configuration ); config .addAppender(appender ); interLogger .addAppender(appender ); appender .start(); ctx .updateLoggers(); } } 方法二和方法一的使用是一樣的,也許要定義一個Markers類. 注意:0.0.0.0換成你自己的IP地址