記一次kafka消費能力優化
阿新 • • 發佈:2019-01-03
之前的程式碼:
有多個source:多個kafka,一個ES
1.消費者資料介面
interface Source {
List<String> poll();
}
2.impl
class KafkaSource implement Source {
List<String> poll() {
ConsumerRecords<String,String> records = kafkaConsumer.poll(500);
List<String> dataList = new ArrayList(); //linkedlist是否要好點
for(ConsumerRecord record : records ) {
String data = Adaptor.adaptor(record);
dataList.add(data);
}
return dataList ;
}
}
3.實際消費者
class Server {
Source source;
public Server(Source source) {
this.source = source;
}
void run() {
while(true ) {
List<String> dataList = source.poll();
for(String data : dataList) {
doSomething(data);
}
}
}
}
消費能力:10000條/s
1.消費者資料介面
改造(使用Vistor模式)
public interface Source {
void consume(SourceVistor sourceVistor);
interface SourceVistor{
void accept(Event event);
}
}
2.impl
class KafkaSource implement Source {
public void consume(final SourceVistor sourceVistor) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollMillions);
for (ConsumerRecord<String, String> record : consumerRecords)
{
sourceVistor.accept(Adaptor.adapt(record.value()));
}
consumer.commitAsync();
}
}
});
}
}
3.實際消費者
public class Server {
@Override
public void start() {
//開始消費kafka
for (Source source : sources) {
source.consume(new MatchAccepter());
}
}}
private class MatchAccepter implements Source.SourceVistor{
public void accept(String data) {
doSomething(data);
}
}
}
4.修改kafka消費配置,指定消費資料量
<!-- 消費者通用配置 -->
<util:properties id="commonConsumerConfig">
<prop key="enable.auto.commit">${kafka.consumer.auto.commit.enable:false}</prop>
<prop key="request.timeout.ms">${kafka.consumer.request.timeout.ms:50000}</prop>
<prop key="auto.commit.interval.ms">${kafka.consumer.auto.commit.interval.ms:1000}</prop>
<prop key="max.partition.fetch.bytes">${kafka.consumer.max.partition.fetch.bytes:1000000}</prop>
<prop key="auto.offset.reset">${kafka.consumer.auto.offset.reset:latest}</prop>
<prop key="heartbeat.interval.ms">${kafka.consumer.heartbeat.interval.ms:25000}</prop>
<prop key="session.timeout.ms">${kafka.consumer.session.timeout.ms:30000}</prop>
<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
</util:properties>
消費能力: 70000/s
總結:
可以看出,之前的方式,讀取出來再封裝進List再迴圈讀取這種方式非常簡單,也是第一時間想到的,但是效率比後者訪問者模式低了一個數量級(可能也和kafka配置有關.)改了之後,只需要讀取一次就消費,邏輯上來講,減少了2/3的浪費!!!
對於kafka這種我們監控資料,每秒鐘可能10W條資料,因此一點點地方都要注意,何況這個地方是重中之重!改了之後,效能立馬飆升!
(記憶體2G)
最後:Vistor模式很不錯!