1. 程式人生 > >叢集下的kafka實現多執行緒消費

叢集下的kafka實現多執行緒消費

上一篇文章講述瞭如何部署kafka叢集,而這篇文章則來探討一下如何使用多執行緒消費,提高消費能力,保障資料的時效性。而實現多執行緒消費其實很簡單,只需要三步即可:

一:kafka叢集配置

多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。

1.首先修改server.properties裡:
num.partitions=3
這裡等於3是因為本人的叢集是用了三臺機子,也就是3個broker,所以設定成3,具體數值可以根據叢集情況設定。
2.建立topic:
bin/kafka-topics.sh –create –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –replication-factor 3 –partitions 3 –topic test
這裡的3對應上面的配置裡的num.partitions=3
3.檢視topic資訊:
./kafka-topics.sh –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –topic test–describe
會有如下顯示錶示建立成功:
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2

二:生產者隨機分割槽提交資料

這也是一個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費;
這裡借用了一片程式碼,自定義隨機分割槽:

public class MyPartition implements Partitioner{
     private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); 
    @Override
    public void configure(Map<String, ?> arg0) {
        // TODO Auto-generated method stub
} @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int
numPartitions = partitions.size(); int partitionNum = 0; try { partitionNum = Integer.parseInt((String) key); } catch (Exception e) { partitionNum = key.hashCode() ; } // System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value); return Math.abs(partitionNum % numPartitions); } }

然後在初始化kafka生產者配置的時候修改如下配置

props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));

這樣就實現了kafka生產者隨機分割槽提交資料。

三:消費者

最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    kafkaConsumerService.getInstance();
                }
            });
        }

在消費方面得注意,這裡得遍歷所有分割槽,否則還是隻消費了一個區:

        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (TopicPartition partition : records.partitions()) {  
            List<ConsumerRecord<String, String>> partitionRecords = records  
                    .records(partition); 
        for (ConsumerRecord<String, String> record : partitionRecords) {
            System.out.println(
                    "message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()
                    + " 分割槽:" + record.partition());
            if (record.value() == null || record.key() == null) {
                consumer.commitSync();
            } else {
                // dealMessage
                KafkaServer.dealMessage(record.key(),record.value(),consumer);
//              consumer.commitSync();
            }
         }
        }

注意上面的執行緒為啥只有3個,這裡得跟上面kafka的分割槽個數相對應起來,否則如果執行緒超過分割槽數量,那麼只會浪費執行緒,因為即使使用3個以上的執行緒也只會消費三個分割槽,而少了則無法消費完全。所以這裡必須更上面的對應起來。
至此,多執行緒消費就已經完畢。這也只是本人摸索以及借鑑的經驗,如有錯誤或者問題,請留言交流,互相學習。