從零到日誌採集索引視覺化、監控報警、rpc trace跟蹤-日誌索引
阿新 • • 發佈:2019-01-30
之前說到日誌事件的設計、如何埋點以及基於jvm的程式如何對接我們的系統,接下去我們說下日誌如何進行索引。通過前三篇部落格可以知道資料通過LOGGER.info等列印日誌的函式就可以存入kafka,所以我們對日誌建立索引只需要實時讀kafka寫入es,為了提高實時索引的速率,我們會部署3個例項實時消費kafka的9個partition,並且使用es的bulk load api,這樣測試下來大概3臺pc上能夠實時每秒索引2w+的資料,實時處理kafka資料寫檔案大概每秒50w+的處理速度,完全能夠滿足我們公司現有的日誌實時採集索引需求。程式碼比較簡單,核心程式碼如下:BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
int count = 0;
try {
while (true) {
ConsumerRecords<byte[], String> records = this.kafkaConsumerApp.poll(this.kafkaProperties.getPollTimeout());
if (!records.isEmpty()) {
for (ConsumerRecord<byte[], String> record : records) {
String value = record.value();
XContentBuilder source = this.buildXContentBuilder(value);
if (source != null) {
bulkRequest.add(transportClient.prepareIndex(this.esProperties.getIndex(), this.esProperties.getDoc())
.setSource(source));
} else {
LOGGER.info("record transform error, {}" , value);
}
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
count++;
if (count >= 1000) {
// 當達到了1000觸發向kafka提交offset
kafkaConsumerApp.commitAsync(currentOffsets, new KafkaOffsetCommitCallback());
count = 0;
}
}
int size = bulkRequest.numberOfActions();
if (size != 0) {
bulkRequest.execute().actionGet();
}
LOGGER.info("total record: {}, indexed {} records to es", records.count(), size);
bulkRequest = transportClient.prepareBulk();
kafkaConsumerApp.commitAsync(currentOffsets, new KafkaOffsetCommitCallback());
}
}
} catch (WakeupException e) {
// do not process, this is shutdown
LOGGER.error("wakeup, start to shutdown, {}", e);
} catch (Exception e) {
LOGGER.error("process records error, {}", e);
} finally {
kafkaConsumerApp.commitSync(currentOffsets);
LOGGER.info("finally commit the offset");
// 不需要主動調kafkaConsumer.close(), spring bean容器會呼叫
}該kafka group為es-indexer-consume-group/**
* 根據log字串構造XContentBuilder
* @param line
* @return
*/
private XContentBuilder buildXContentBuilder(String line) {
try {
LogDto logDto = new LogDto(line);
return jsonBuilder()
.startObject()
.field(Constants.DAY, logDto.getDay())
.field(Constants.TIME, logDto.getTime())
.field(Constants.NANOTIME, logDto.getNanoTime())
.field(Constants.CREATED, logDto.getCreated())
.field(Constants.APP, logDto.getApp())
.field(Constants.HOST, logDto.getHost())
.field(Constants.THREAD, logDto.getThread())
.field(Constants.LEVEL, logDto.getLevel())
.field(Constants.EVENT_TYPE, logDto.getEventType())
.field(Constants.PACK, logDto.getPack())
.field(Constants.CLAZZ, logDto.getClazz())
.field(Constants.LINE, logDto.getLine())
.field(Constants.MESSAGE_SMART, logDto.getMessageSmart())
.field(Constants.MESSAGE_MAX, logDto.getMessageMax())
.endObject();
} catch (Exception e) {
return null;
}
}由於是進行日誌消費,可以允許有一定的丟失和重複消費,但是應該儘量避免。程式碼其實很簡單,主要說明下:
int count = 0;
try {
while (true) {
ConsumerRecords<byte[], String> records = this.kafkaConsumerApp.poll(this.kafkaProperties.getPollTimeout());
if (!records.isEmpty()) {
for (ConsumerRecord<byte[], String> record : records) {
String value = record.value();
if (source != null) {
bulkRequest.add(transportClient.prepareIndex(this.esProperties.getIndex(), this.esProperties.getDoc())
.setSource(source));
} else {
LOGGER.info("record transform error, {}"
}
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
count++;
if (count >= 1000) {
// 當達到了1000觸發向kafka提交offset
kafkaConsumerApp.commitAsync(currentOffsets, new KafkaOffsetCommitCallback());
}
}
int size = bulkRequest.numberOfActions();
if (size != 0) {
bulkRequest.execute().actionGet();
}
LOGGER.info("total record: {}, indexed {} records to es", records.count(), size);
bulkRequest = transportClient.prepareBulk();
kafkaConsumerApp.commitAsync(currentOffsets, new KafkaOffsetCommitCallback());
}
}
} catch (WakeupException e) {
// do not process, this is shutdown
LOGGER.error("wakeup, start to shutdown, {}", e);
} catch (Exception e) {
LOGGER.error("process records error, {}", e);
} finally {
kafkaConsumerApp.commitSync(currentOffsets);
LOGGER.info("finally commit the offset");
// 不需要主動調kafkaConsumer.close(), spring bean容器會呼叫
}該kafka group為es-indexer-consume-group/**
* 根據log字串構造XContentBuilder
* @param line
* @return
*/
private XContentBuilder buildXContentBuilder(String line) {
try {
LogDto logDto = new LogDto(line);
return jsonBuilder()
.startObject()
.field(Constants.DAY, logDto.getDay())
.field(Constants.TIME, logDto.getTime())
.field(Constants.NANOTIME, logDto.getNanoTime())
.field(Constants.CREATED, logDto.getCreated())
.field(Constants.APP, logDto.getApp())
.field(Constants.HOST, logDto.getHost())
.field(Constants.THREAD, logDto.getThread())
.field(Constants.LEVEL, logDto.getLevel())
.field(Constants.EVENT_TYPE, logDto.getEventType())
.field(Constants.PACK, logDto.getPack())
.field(Constants.CLAZZ, logDto.getClazz())
.field(Constants.LINE, logDto.getLine())
.field(Constants.MESSAGE_SMART, logDto.getMessageSmart())
.field(Constants.MESSAGE_MAX, logDto.getMessageMax())
.endObject();
} catch (Exception e) {
return null;
}
}由於是進行日誌消費,可以允許有一定的丟失和重複消費,但是應該儘量避免。程式碼其實很簡單,主要說明下:
- kafka消費的時候儘量自己控制offset,以防kafka出現異常的時候導致大量的重複消費和丟失當kafka consumer進行rebalance的時候需要將當前的消費者的offset進行提交同步提交offset commitSync(xxx)會等待提交完成非同步提交offset commitAsync(xxx, callback)進行非同步提交,無需等待
- 針對以上情況,同步提交我們可以放在rebalance的時候,非同步提交應該放在正常消費的時候,並且提交出錯需要列印異常進行排查錯誤