日誌打入kafka改造歷程-我們到底能走多遠系列49
方案
日誌收集的方案有很多,包括各種日誌過濾清洗,分析,統計,而且看起來都很高大上。本文只描述一個打入kafka的功能。
流程:app->kafka->logstash->es->kibana
業務應用直接將日誌打入kafka,然後由logstash消費,數據進入es。
另一方面,應用在服務器上會打日誌文件。
如圖:
詳細
初步實現
首先,我們來初步實現這個方案,搭建elk略去不談,其中特別註意各個版本的兼容。這裏主要在代碼層面講解如何實現的歷程。
要將日誌數據寫入kafka,我們想只要依賴官方提供的kafka client就可以了,翻看github,有現成的:鏈接
沒多少代碼,通看一遍,在此基礎上進行修改即可。
核心appender代碼:
public class KafkaAppender<E> extends KafkaAppenderConfig<E> { /** * Kafka clients uses this prefix for its slf4j logging. * This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects. */ private static final String KAFKA_LOGGER_PREFIX = "org.apache.kafka.clients"; public static final Logger logger = LoggerFactory.getLogger(KafkaAppender.class); private LazyProducer lazyProducer = null; private final AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>(); private final ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>(); private final FailedDeliveryCallback<E> failedDeliveryCallback = new FailedDeliveryCallback<E>() { @Override public void onFailedDelivery(E evt, Throwable throwable) { aai.appendLoopOnAppenders(evt); } }; public KafkaAppender() { // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer) addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); } @Override public void doAppend(E e) { ensureDeferredAppends(); if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { deferAppend(e); } else { super.doAppend(e); } } @Override public void start() { // only error free appenders should be activated if (!checkPrerequisites()) return; lazyProducer = new LazyProducer(); super.start(); } @Override public void stop() { super.stop(); if (lazyProducer != null && lazyProducer.isInitialized()) { try { lazyProducer.get().close(); } catch (KafkaException e) { this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e); } lazyProducer = null; } } @Override public void addAppender(Appender<E> newAppender) { aai.addAppender(newAppender); } @Override public Iterator<Appender<E>> iteratorForAppenders() { return aai.iteratorForAppenders(); } @Override public Appender<E> getAppender(String name) { return aai.getAppender(name); } @Override public boolean isAttached(Appender<E> appender) { return aai.isAttached(appender); } @Override public void detachAndStopAllAppenders() { aai.detachAndStopAllAppenders(); } @Override public boolean detachAppender(Appender<E> appender) { return aai.detachAppender(appender); } @Override public boolean detachAppender(String name) { return aai.detachAppender(name); } @Override protected void append(E e) { // encode 邏輯 final byte[] payload = encoder.doEncode(e); final byte[] key = keyingStrategy.createKey(e); final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[],byte[]>(topic, key, payload); Producer producer = lazyProducer.get(); if(producer == null){ logger.error("kafka producer is null"); return; } // 核心發送方法 deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); } protected Producer<byte[], byte[]> createProducer() { return new KafkaProducer<byte[], byte[]>(new HashMap<String, Object>(producerConfig)); } private void deferAppend(E event) { queue.add(event); } // drains queue events to super private void ensureDeferredAppends() { E event; while ((event = queue.poll()) != null) { super.doAppend(event); } } /** * Lazy initializer for producer, patterned after commons-lang. * * @see <a href="https://commons.apache.org/proper/commons-lang/javadocs/api-3.4/org/apache/commons/lang3/concurrent/LazyInitializer.html">LazyInitializer</a> */ private class LazyProducer { private volatile Producer<byte[], byte[]> producer; private boolean initialized; public Producer<byte[], byte[]> get() { Producer<byte[], byte[]> result = this.producer; if (result == null) { synchronized(this) { if(!initialized){ result = this.producer; if(result == null) { // 註意 這裏initialize可能失敗,比如傳入servers為非法字符,返回producer為空,所以只用initialized標記來確保不進行重復初始化,而避免不斷出錯的初始化 this.producer = result = this.initialize(); initialized = true; } } } } return result; } protected Producer<byte[], byte[]> initialize() { Producer<byte[], byte[]> producer = null; try { producer = createProducer(); } catch (Exception e) { addError("error creating producer", e); } return producer; } public boolean isInitialized() { return producer != null; } } }
以上代碼對producer
生產時進行initialized標記,確保在異常場景時只生產一次。
在實際場景中比如我們的servers配置非ip的字符,initialize方法會返回null,因為判斷是否進行initialize()方法是判斷producer
是否為空,所以進入不斷失敗的情況,從而導致應用啟動失敗。
配置logback-spring.xml:
<springProperty scope="context" name="LOG_KAFKA_SERVERS" source="application.log.kafka.bootstrap.servers"/> <springProperty scope="context" name="LOG_KAFKA_TOPIC" source="application.log.kafka.topic"/> <appender name="KafkaAppender" class="com.framework.common.log.kafka.KafkaAppender"> <topic>${LOG_KAFKA_TOPIC}</topic> <producerConfig>bootstrap.servers=${LOG_KAFKA_SERVERS}</producerConfig> </appender>
bootstrap.properties配置:
application.log.kafka.bootstrap.servers=10.0.11.55:9092
application.log.kafka.topic=prod-java
在打入kafka的json進行自定義,上面的encoder.doEncode(e)進行擴展:
public class FormatKafkaMessageEncoder<E> extends KafkaMessageEncoderBase<E> {
protected static final int BUILDER_CAPACITY = 2048;
protected static final int LENGTH_OPTION = 2048;
public static final String CAUSED_BY = "Caused by: ";
public static final String SUPPRESSED = "Suppressed: ";
public static final char TAB = '\t';
public byte[] encode(ILoggingEvent event) {
Map<String, String> formatMap = new HashMap<>();
formatMap.put("timestamp", event.getTimeStamp()!=0?String.valueOf(new Date(event.getTimeStamp())):"");
formatMap.put("span", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-SpanId"):"");
formatMap.put("trace", event.getMDCPropertyMap()!=null?event.getMDCPropertyMap().get("X-B3-TraceId"):"");
formatMap.put("class", event.getLoggerName());
formatMap.put("level", event.getLevel() != null?event.getLevel().toString():"");
formatMap.put("message", event.getMessage());
formatMap.put("stacktrace", event.getThrowableProxy()!=null?convertStackTrace(event.getThrowableProxy()):"");
formatMap.put("thread", event.getThreadName());
formatMap.put("ip", IpUtil.getLocalIP());
formatMap.put("application", event.getLoggerContextVO()!=null&&event.getLoggerContextVO().getPropertyMap()!=null?
event.getLoggerContextVO().getPropertyMap().get("springAppName"):"");
String formatJson = JSONObject.toJSONString(formatMap);
return formatJson.getBytes();
}
@Override
public byte[] doEncode(E event) {
return encode((ILoggingEvent) event);
}
public String convertStackTrace(IThrowableProxy tp){
StringBuilder sb = new StringBuilder(BUILDER_CAPACITY);
recursiveAppend(sb, tp, null);
return sb.toString();
}
private void recursiveAppend(StringBuilder sb, IThrowableProxy tp, String prefix) {
if(tp == null){
return;
}
if (prefix != null) {
sb.append(prefix);
}
sb.append(tp.getClassName()).append(": ").append(tp.getMessage());
sb.append(CoreConstants.LINE_SEPARATOR);
StackTraceElementProxy[] stepArray = tp.getStackTraceElementProxyArray();
boolean unrestrictedPrinting = LENGTH_OPTION > stepArray.length;
int maxIndex = (unrestrictedPrinting) ? stepArray.length : LENGTH_OPTION;
for (int i = 0; i < maxIndex; i++) {
sb.append(TAB);
StackTraceElementProxy element = stepArray[i];
sb.append(element);
sb.append(CoreConstants.LINE_SEPARATOR);
}
IThrowableProxy[] suppressed = tp.getSuppressed();
if (suppressed != null) {
for (IThrowableProxy current : suppressed) {
recursiveAppend(sb, current, SUPPRESSED);
}
}
recursiveAppend(sb, tp.getCause(), CAUSED_BY);
}
}
其中recursiveAppend方法是模仿ch.qos.logback.classic.spi.ThrowableProxyUtil
,用來答應異常的全部堆棧。
還有這個ip的獲取問題,InetAddress.getLocalHost().getHostAddress()
解決不了。
以下是詳細代碼:
public class IpUtil {
public static final String DEFAULT_IP = "127.0.0.1";
public static String cacheLocalIp = null;
private static Logger logger = LoggerFactory.getLogger(IpUtil.class);
/**
* 直接根據第一個網卡地址作為其內網ipv4地址,避免返回 127.0.0.1
*
* @return
*/
private static String getLocalIpByNetworkCard() {
String ip = null;
try {
for (Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces(); e.hasMoreElements(); ) {
NetworkInterface item = e.nextElement();
for (InterfaceAddress address : item.getInterfaceAddresses()) {
if (item.isLoopback() || !item.isUp()) {
continue;
}
if (address.getAddress() instanceof Inet4Address) {
Inet4Address inet4Address = (Inet4Address) address.getAddress();
ip = inet4Address.getHostAddress();
}
}
}
} catch (Exception e) {
logger.error("getLocalIpByNetworkCard error", e);
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e1) {
logger.error("InetAddress.getLocalHost().getHostAddress() error", e1);
ip = DEFAULT_IP;
}
}
return ip == null ? DEFAULT_IP : ip;
}
public synchronized static String getLocalIP() {
if(cacheLocalIp == null){
cacheLocalIp = getLocalIpByNetworkCard();
return cacheLocalIp;
}else{
return cacheLocalIp;
}
}
}
另外在logback-spring.xml中配置了本地日誌appender:
<!-- 按照每天生成日誌文件 -->
<appender name="filelog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!-- rollover daily -->
<fileNamePattern>${LOG_FOLDER}/${springAppName}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<!-- each file should be at most 100MB, keep 6 days worth of history-->
<maxFileSize>300MB</maxFileSize>
<!--歷史文件保留個數-->
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder>
<!--格式化輸出:%d表示日期,%thread表示線程名,%-5level:級別從左顯示5個字符寬度%msg:日誌消息,%n是換行符-->
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
註意這裏使用SizeAndTimeBasedRollingPolicy而不是使用TimeBasedRollingPolicy+SizeBasedTriggeringPolicy。
後者是按文件大小優先級最高不會自動按日期生成新的log文件。
至此,一個打入kafka日誌的代碼就算完結了,功能完全,執行正確。
異常場景
思考下,在啟動應用或在應用運行時,kafka無法正確接收信息,比如掛掉了。那麽這個打日誌的功能會怎麽表現呢?
當然是每次寫日誌都會嘗試去連kafka,但是失敗,必然影響應用狀態。
所以想到熔斷的思路,假設kafka掛掉,可以通過熔斷的方式降低對應用的影響。
這裏就實現了一下熔斷器的邏輯。
狀態流轉圖:
熔斷器:
/**
* @desc 熔斷器
* 1,使用failureCount和consecutiveSuccessCount控制斷路器狀態的流轉,兩者都使用了AtomicInteger以確保並發場數量的精準
* 2,successCount 沒有使用AtomicInteger 不確保準確性
* 3,failureThreshold,consecutiveSuccessThreshold,timeout參數非法賦默認值
*/
public class CircuitBreaker {
private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);
private String name;
/**
* 熔斷器狀態
*/
private CircuitBreakerState state;
/**
* 失敗次數閥值
*/
private int failureThreshold;
/**
* 熔斷狀態時間窗口
*/
private long timeout;
/**
* 失敗次數
*/
private AtomicInteger failureCount;
/**
* 成功次數 (並發不準確)
*/
private int successCount;
/**
* 半開時間窗口裏連續成功的次數
*/
private AtomicInteger consecutiveSuccessCount;
/**
* 半開時間窗口裏連續成功的次數閥值
*/
private int consecutiveSuccessThreshold;
public CircuitBreaker(String name, int failureThreshold, int consecutiveSuccessThreshold, long timeout) {
if(failureThreshold <= 0){
failureThreshold = 1;
}
if(consecutiveSuccessThreshold <= 0){
consecutiveSuccessThreshold = 1;
}
if(timeout <= 0){
timeout = 10000;
}
this.name = name;
this.failureThreshold = failureThreshold;
this.consecutiveSuccessThreshold = consecutiveSuccessThreshold;
this.timeout = timeout;
this.failureCount = new AtomicInteger(0);
this.consecutiveSuccessCount = new AtomicInteger(0);
state = new CloseCircuitBreakerState(this);
}
public void increaseFailureCount(){
failureCount.addAndGet(1);
}
public void increaseSuccessCount(){
successCount++;
}
public void increaseConsecutiveSuccessCount(){
consecutiveSuccessCount.addAndGet(1);
}
public boolean increaseFailureCountAndThresholdReached(){
return failureCount.addAndGet(1) >= failureThreshold;
}
public boolean increaseConsecutiveSuccessCountAndThresholdReached(){
return consecutiveSuccessCount.addAndGet(1) >= consecutiveSuccessThreshold;
}
public boolean isNotOpen(){
return !isOpen();
}
/**
* 熔斷開啟 關閉保護方法的調用
* @return
*/
public boolean isOpen(){
return state instanceof OpenCircuitBreakerState;
}
/**
* 熔斷關閉 保護方法正常執行
* @return
*/
public boolean isClose(){
return state instanceof CloseCircuitBreakerState;
}
/**
* 熔斷半開 保護方法允許測試調用
* @return
*/
public boolean isHalfClose(){
return state instanceof HalfOpenCircuitBreakerState;
}
public void transformToCloseState(){
state = new CloseCircuitBreakerState(this);
}
public void transformToHalfOpenState(){
state = new HalfOpenCircuitBreakerState(this);
}
public void transformToOpenState(){
state = new OpenCircuitBreakerState(this);
}
/**
* 重置失敗次數
*/
public void resetFailureCount() {
failureCount.set(0);
}
/**
* 重置連續成功次數
*/
public void resetConsecutiveSuccessCount() {
consecutiveSuccessCount.set(0);
}
public long getTimeout() {
return timeout;
}
/**
* 判斷是否到達失敗閥值
* @return
*/
protected boolean failureThresholdReached() {
return failureCount.get() >= failureThreshold;
}
/**
* 判斷連續成功次數是否達到閥值
* @return
*/
protected boolean consecutiveSuccessThresholdReached(){
return consecutiveSuccessCount.get() >= consecutiveSuccessThreshold;
}
/**
* 保護方法失敗後操作
*/
public void actFailed(){
state.actFailed();
}
/**
* 保護方法成功後操作
*/
public void actSuccess(){
state.actSuccess();
}
public static interface Executor {
/**
* 任務執行接口
*
*/
void execute();
}
public void execute(Executor executor){
if(!isOpen()){
try{
executor.execute();
this.actSuccess();
}catch (Exception e){
this.actFailed();
logger.error("CircuitBreaker executor error", e);
}
}else{
logger.error("CircuitBreaker named {} is open", this.name);
}
}
public String show(){
Map<String, Object> map = new HashMap<>();
map.put("name:",name);
map.put("state", isClose());
map.put("failureThreshold:",failureThreshold);
map.put("failureCount:",failureCount);
map.put("consecutiveSuccessThreshold:",consecutiveSuccessThreshold);
map.put("consecutiveSuccessCount:",consecutiveSuccessCount);
map.put("successCount:",successCount);
map.put("timeout:",timeout);
map.put("state class",state.getClass());
return JSONObject.toJSONString(map);
}
}
狀態機:
public interface CircuitBreakerState {
/**
* 保護方法失敗後操作
*/
void actFailed();
/**
* 保護方法成功後操作
*/
void actSuccess();
}
public abstract class AbstractCircuitBreakerState implements CircuitBreakerState{
protected CircuitBreaker circuitBreaker;
public AbstractCircuitBreakerState(CircuitBreaker circuitBreaker) {
this.circuitBreaker = circuitBreaker;
}
@Override
public void actFailed() {
circuitBreaker.increaseFailureCount();
}
@Override
public void actSuccess() {
circuitBreaker.increaseSuccessCount();
}
}
public class CloseCircuitBreakerState extends AbstractCircuitBreakerState{
public CloseCircuitBreakerState(CircuitBreaker circuitBreaker) {
super(circuitBreaker);
circuitBreaker.resetFailureCount();
circuitBreaker.resetConsecutiveSuccessCount();
}
@Override
public void actFailed() {
// 進入開啟狀態
if (circuitBreaker.increaseFailureCountAndThresholdReached()) {
circuitBreaker.transformToOpenState();
}
}
}
public class HalfOpenCircuitBreakerState extends AbstractCircuitBreakerState{
public HalfOpenCircuitBreakerState(CircuitBreaker circuitBreaker) {
super(circuitBreaker);
circuitBreaker.resetConsecutiveSuccessCount();
}
@Override
public void actFailed() {
super.actFailed();
circuitBreaker.transformToOpenState();
}
@Override
public void actSuccess() {
super.actSuccess();
// 達到成功次數的閥值 關閉熔斷
if(circuitBreaker.increaseFailureCountAndThresholdReached()){
circuitBreaker.transformToCloseState();
}
}
}
public class OpenCircuitBreakerState extends AbstractCircuitBreakerState{
public OpenCircuitBreakerState(CircuitBreaker circuitBreaker) {
super(circuitBreaker);
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
circuitBreaker.transformToHalfOpenState();
timer.cancel();
}
}, circuitBreaker.getTimeout());
}
}
/* @desc 熔斷器工廠 集中應用中的CircuitBreaker
* 註意:這裏一個熔斷器一旦生產,生命周期和應用一樣,不會被清除
*/
public class CircuitBreakerFactory {
private static ConcurrentHashMap<String, CircuitBreaker> circuitBreakerMap = new ConcurrentHashMap();
public CircuitBreaker getCircuitBreaker(String name){
CircuitBreaker circuitBreaker = circuitBreakerMap.get(name);
return circuitBreaker;
}
/**
*
* @param name 唯一名稱
* @param failureThreshold 失敗次數閥值
* @param consecutiveSuccessThreshold 時間窗內成功次數閥值
* @param timeout 時間窗
* 1,close狀態時 失敗次數>=failureThreshold,進入open狀態
* 2,open狀態時每隔timeout時間會進入halfOpen狀態
* 3,halfOpen狀態裏需要連續成功次數達到consecutiveSuccessThreshold,
* 即可進入close狀態,出現失敗則繼續進入open狀態
* @return
*/
public static CircuitBreaker buildCircuitBreaker(String name, int failureThreshold, int consecutiveSuccessThreshold, long timeout){
CircuitBreaker circuitBreaker = new CircuitBreaker(name, failureThreshold, consecutiveSuccessThreshold, timeout);
circuitBreakerMap.put(name, circuitBreaker);
return circuitBreaker;
}
}
發送kafka消息時使用熔斷器:
/**
* 因日誌為非業務應用核心服務,防止kafka不穩定導致影響應用狀態,這裏使用使用熔斷機制 失敗3次開啟熔斷,每隔20秒半開熔斷,連續成功兩次關閉熔斷。
*/
CircuitBreaker circuitBreaker = CircuitBreakerFactory.buildCircuitBreaker("KafkaAppender-c", 3, 2, 20000);
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
final FailedDeliveryCallback<E> failedDeliveryCallback) {
if(circuitBreaker.isNotOpen()){
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
circuitBreaker.actFailed();
failedDeliveryCallback.onFailedDelivery(event, exception);
logger.error("kafka producer send log error",exception);
}else{
circuitBreaker.actSuccess();
}
});
return true;
} catch (KafkaException e) {
circuitBreaker.actFailed();
failedDeliveryCallback.onFailedDelivery(event, e);
logger.error("kafka send log error",e);
return false;
}
}else{
logger.error("kafka log circuitBreaker open");
return false;
}
}
總結
1,elk搭建時需特別註意各個版本的兼容,kafka client的版本需和kafka版本保持一致
2,方案容許kafka日誌失敗,而本地日誌更加可靠,所以用熔斷器方案,以應對萬一。也可用於對其他第三方請求時使用。
日誌打入kafka改造歷程-我們到底能走多遠系列49