叢集下的kafka實現多執行緒消費
上一篇文章講述瞭如何部署kafka叢集,而這篇文章則來探討一下如何使用多執行緒消費,提高消費能力,保障資料的時效性。而實現多執行緒消費其實很簡單,只需要三步即可:
一:kafka叢集配置
多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。
1.首先修改server.properties裡:
num.partitions=3
這裡等於3是因為本人的叢集是用了三臺機子,也就是3個broker,所以設定成3,具體數值可以根據叢集情況設定。
2.建立topic:
bin/kafka-topics.sh –create –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –replication-factor 3 –partitions 3 –topic test
這裡的3對應上面的配置裡的num.partitions=3
3.檢視topic資訊:
./kafka-topics.sh –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –topic test–describe
會有如下顯示錶示建立成功:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
二:生產者隨機分割槽提交資料
這也是一個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費;
這裡借用了一片程式碼,自定義隨機分割槽:
public class MyPartition implements Partitioner{
private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
@Override
public void configure(Map<String, ?> arg0) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int partitionNum = 0;
try {
partitionNum = Integer.parseInt((String) key);
} catch (Exception e) {
partitionNum = key.hashCode() ;
}
// System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value);
return Math.abs(partitionNum % numPartitions);
}
}
然後在初始化kafka生產者配置的時候修改如下配置
props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));
這樣就實現了kafka生產者隨機分割槽提交資料。
三:消費者
最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
kafkaConsumerService.getInstance();
}
});
}
在消費方面得注意,這裡得遍歷所有分割槽,否則還是隻消費了一個區:
ConsumerRecords<String, String> records = consumer.poll(1000);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records
.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(
"message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()
+ " 分割槽:" + record.partition());
if (record.value() == null || record.key() == null) {
consumer.commitSync();
} else {
// dealMessage
KafkaServer.dealMessage(record.key(),record.value(),consumer);
// consumer.commitSync();
}
}
}
注意上面的執行緒為啥只有3個,這裡得跟上面kafka的分割槽個數相對應起來,否則如果執行緒超過分割槽數量,那麼只會浪費執行緒,因為即使使用3個以上的執行緒也只會消費三個分割槽,而少了則無法消費完全。所以這裡必須更上面的對應起來。
至此,多執行緒消費就已經完畢。這也只是本人摸索以及借鑑的經驗,如有錯誤或者問題,請留言交流,互相學習。