kafkaChannel實現一個source下,不同日誌採集到kafka不同主題中
阿新 • • 發佈:2018-11-12
1.需求
使用flume採集資料,在使用一個source情況下,將不同的日誌採集到指定的kafka的主題中。
例如:有兩個日誌檔案:error.log和info.log
error.log採集到kafka的kafka_channel主題
info.log採集到kafka的kafka_channel2主題
2.解決方案
我們使用tailDir source 和kafkaChannel
思路:
使用a0.sources.r1.headers.f1.headerKey = error,a0.sources.r1.headers.f2.headerKey = info。去設定event的一個header值,不同檔案設定不同的header值,用於區分,其中headerKey可以隨便設定,就是header中的一個key而已, 在原始碼中找到kafka-channel,在都doPut()方法中,去獲去每一個event的header,我們知道event的hader一個map。然後header.get(headerKey)獲取我們設定的頭標記,如果是error,kafka的主題設定為kafka_channel如果是info,則kafka的主題設定為kafka_channel2,也就是如下程式碼邏輯。
String type=headers.get("headerKey"); if(type.equals("info")){ topicStr="kafka_channel2"; }else if(type.equals("error")){ topicStr="kafka_channel"; }
原始碼更改
更改前:
protected void doPut(Event event) throws InterruptedException { type = TransactionType.PUT; if (!producerRecords.isPresent()) { producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>()); } String key = event.getHeaders().get(KEY_HEADER); //get header Map<String, String> headers = event.getHeaders(); String topicStr=null; Integer partitionId = null; try { if (staticPartitionId != null) { partitionId = staticPartitionId; } if (partitionHeader != null) { String headerVal = event.getHeaders().get(partitionHeader); if (headerVal != null) { partitionId = Integer.parseInt(headerVal); } } if (partitionId != null) { producerRecords.get().add( new ProducerRecord<String, byte[]>(topic.get(), partitionId, key, serializeValue(event, parseAsFlumeEvent))); } else { producerRecords.get().add( new ProducerRecord<String, byte[]>(topic.get(), key, serializeValue(event, parseAsFlumeEvent))); } } catch (NumberFormatException e) { throw new ChannelException("Non integer partition id specified", e); } catch (Exception e) { throw new ChannelException("Error while serializing event", e); } }
更改後:
protected void doPut(Event event) throws InterruptedException {
type = TransactionType.PUT;
if (!producerRecords.isPresent()) {
producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
}
String key = event.getHeaders().get(KEY_HEADER);
//get header
Map<String, String> headers = event.getHeaders();
String topicStr=null;
Integer partitionId = null;
/**
* 在這可以更改程式碼邏輯,實現:資料傳送到指定的kafka分割槽中
*/
try {
if (staticPartitionId != null) {
partitionId = staticPartitionId;
}
if (partitionHeader != null) {
String headerVal = event.getHeaders().get(partitionHeader);
if (headerVal != null) {
partitionId = Integer.parseInt(headerVal);
}
}
/**
*新增的邏輯
*/
String type=headers.get("headerKey");
if(type.equals("info")){
topicStr="kafka_channel2";
}else if(type.equals("error")){
topicStr="kafka_channel";
}
if (partitionId != null) {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topicStr, partitionId, key,
serializeValue(event, parseAsFlumeEvent)));
} else {
producerRecords.get().add(
new ProducerRecord<String, byte[]>(topicStr, key,
serializeValue(event, parseAsFlumeEvent)));
}
} catch (NumberFormatException e) {
throw new ChannelException("Non integer partition id specified", e);
} catch (Exception e) {
throw new ChannelException("Error while serializing event", e);
}
}
採集方法
注意:
更改原始碼後,不需要在配置檔案中指定kafka的主題,當然指定主題也不錯,但是已經沒作用了,已經在程式碼中更改了。如果你有精力還可以把不同的kafka主題寫到properties配置檔案中,把程式寫活一點。在相同的思路下你還可以做到顆粒更細:就是指定主題和分割槽,通過條件判斷更改topic和partitionId。最後kafkaSink要想實現這些功能更改原始碼的思路是一樣的。
a0.sources = r1
a0.channels = c1
a0.sources.r1.type = TAILDIR
#通過 json 格式存下每個檔案消費的偏移量,避免從頭消費
a0.sources.r1.positionFile = /data/server/flume-1.8.0/conf/taildir_position.json
a0.sources.r1.filegroups = f1 f2
#配置f1資訊
a0.sources.r1.headers.f1.headerKey = error
a0.sources.r1.filegroups.f1 = /data/access/error.log
#配置f1資訊
a0.sources.r1.headers.f2.headerKey = info
a0.sources.r1.filegroups.f2 = /data/access/info.log
#是否新增一個儲存的絕對路徑名的標頭檔案
#a0.sources.r1.fileHeader = true
#攔截器獲取伺服器的主機名
a0.sources.r1.interceptors = i1 i2 i3
#a0.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a0.sources.r1.interceptors.i1.type = org.apache.flume.host.MyHostInterceptor$Builder
a0.sources.r1.interceptors.i1.preserveExisting = false
#a0.sources.r1.interceptors.i1.useIP = false
a0.sources.r1.interceptors.i1.HeaderName= agentHost
#靜態過濾器新增指定的標誌
a0.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.StaticInterceptor$Builder
a0.sources.r1.interceptors.i2.key = logType
a0.sources.r1.interceptors.i2.value= kafka_data
a0.sources.r1.interceptors.i2.preserveExisting = false
#新增時間戳
a0.sources.r1.interceptors.i3.type = timestamp
#定義channel
a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a0.channels.c1.kafka.bootstrap.servers = 10.2.40.10:9092,10.2.40.14:9092,10.2.40.15:9092
a0.channels.c1.parseAsFlumeEvent = false
#a0.channels.c1.kafka.producer.compression.type = lz4
a0.sources.r1.channels = c1