1. 程式人生 > >Kafka系列之-自定義Producer

Kafka系列之-自定義Producer

  前面已經講到了,在Kafka中,Message是由Producer產生的,Producer產生的Message會發送到Topic的指定Partition中。Producer可以有多種形式,也可以由使用者通過Java,C以及Python語言來自定義。
  Kafka中Producer的主要作用和地位如下圖所示,Producer通過獲取某個Topic指定Partition的Leader節點連線到Kafka叢集中,
這裡寫圖片描述

一、Java Producer API

  使用者可以基於Kafka提供的API自定義Producer,在這些API中有幾個主要的類:

1. kafka.javaapi.producer.Producer


  類定義:

class Producer[ K,V ](private val underlying: kafka.producer.Producer[K ,V])

  UML圖:
  這裡寫圖片描述

2. kafka.producer.ProducerConfig
  類定義:   

class ProducerConfig private (val props: VerifiableProperties)
        extends AsyncProducerConfig with SyncProducerConfigShared

  UML圖:
  這裡寫圖片描述

3. kafka.producer.KeyedMessage


  類定義:

case class KeyedMessage[ K, V ](val topic: String, val key: K, val partKey: Any , val message: V)

二、自定義簡單的Producer

  接下來根據上面的三個類,使用Java程式碼實現一個簡單的Producer向Broker傳送Message。這個Producer會為特定的Topic生成Message併發送到預設的Partition中。
  具體程式碼和過程在程式碼和註釋中。
1、Java程式碼

package ckm.kafka.producer;

import
kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; /** * 一個簡單的Kafka Producer類,傳入兩個引數: * topic num * 設定主題和message條數 * * 執行過程: * 1、建立一個topic * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic xxxx * 2、執行本類中的程式碼 * 3、檢視message * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx * kafka */ public class SimpleKafkaProducer { /** * Producer的兩個泛型,第一個指定Key的型別,第二個指定value的型別 */ private static Producer<String, String> producer; public SimpleKafkaProducer() { Properties props = new Properties(); /** * 指定producer連線的broker列表 */ props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092"); /** * 指定message的序列化方法,使用者可以通過實現kafka.serializer.Encoder介面自定義該類 * 預設情況下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化 */ props.put("serializer.class", "kafka.serializer.StringEncoder"); /** * 這個引數用於通知broker接收到message後是否向producer傳送確認訊號 * 0 - 表示producer不用等待任何確認訊號,會一直髮送訊息, * 否則producer進入等待狀態 * -1 - 表示leader狀態的replica需要等待所有in-sync狀態的replica都接收到訊息後才會向producer傳送確認訊號, * 再次之前producer一直處於等待狀態 */ props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public static void main(String[] args) { if (args.length < 2) { System.out.println("Please Input Topic and Message Numbers"); } String topic = (String) args[0]; int count = Integer.parseInt((String) args[1]); System.out.println("Topic = " + topic); System.out.println("Message Nums = " + count); SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer(); simpleProducer.publishMessage(topic, count); } /** * 根據topic和訊息條數傳送訊息 * @param topic * @param count */ private void publishMessage(String topic, int count) { for (int i = 0; i < count; i ++) { String runtime = new Date().toString(); String msg = "Message published time - " + runtime; System.out.println("msg = " + msg); /** * 第一個泛型指定用於分割槽的key的型別,第二個泛型指message的型別 * topic只能為String型別 */ KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg); producer.send(data); } producer.close(); } }

2、執行
(1)啟動ZooKeeper

$ZK_HOME/bin/zkServer.sh start

這裡寫圖片描述
(2)啟動Kafka叢集

cd $KAFKA_HOME
nohup bin/kafka-server-start.sh config/server.properties &

這裡寫圖片描述
(3)建立測試Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 1 --partition 3 --topic simple-kafka-producer

這裡寫圖片描述
(4)執行SimpleKafkaProducer 程式碼
  執行該程式碼,向simple-kafka-producer Topic傳送10條Message

java -cp KafkaTestProgram.jar ckm.kafka.producer.SimpleKafkaProducer simple-kafka-producer 10

這裡寫圖片描述
(5)檢視simple-kafka-producer中的Message

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic simple-kafka-producer

這裡寫圖片描述

三、自定義Partition的Producer

  這一節中除了實現Producer之外,還自定義了Message的Partition劃分過程。
  在這裡,將會模擬一個網頁訪問日誌生成的過程,每條隨機生成的日誌Message中包含三個部分的資訊:
- 頁面訪問時間戳
- 頁面名稱
- 訪問頁面的IP地址
  
1、Java程式碼
(1)Producer

package ckm.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * 一個自定義分割槽的Kafka Producer類,傳入兩個引數:
 * topic num
 * 設定主題和message條數
 *
 * 模擬使用者點選日誌,日誌格式為:“時間,網址,IP地址"格式
 *
 * 自定義分割槽,通過IP地址最後一位與分割槽數求餘,message分散到0~partition - 1這些分割槽中
 *
 * 執行過程:
 * 1、建立一個topic
 * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic xxxx
 * 2、執行本類中的程式碼
 * 3、檢視message
 * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx
 * kafka
 */
public class KafkaProducerWithPartition {
    /**
     * Producer的兩個泛型,第一個指定Key的型別,第二個指定value的型別
     */
    private static Producer<String, String> producer;

    public KafkaProducerWithPartition() {
        Properties props = new Properties();
        /**
         * 指定producer連線的broker列表
         */
        props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");
        /**
         * 指定message的序列化方法,使用者可以通過實現kafka.serializer.Encoder介面自定義該類
         * 預設情況下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * 這個引數用於通知broker接收到message後是否向producer傳送確認訊號
         *  0 - 表示producer不用等待任何確認訊號,會一直髮送訊息
         *  1 - 表示leader狀態的replica在接收到message後需要向producer傳送一個確認訊號,否則producer進入等待狀態
         * -1 - 表示leader狀態的replica需要等待所有in-sync狀態的replica都接收到訊息後才會向producer傳送確認訊號,再次之前producer一直處於等待狀態
         */
        props.put("request.required.acks", "1");
        /**
         * 指定partition類,自定義的分割槽類,繼承自kafka.producer.Partitioner介面
         */
        props.put("partitioner.class", "ckm.kafka.producer.SimplePartitioner");
        ProducerConfig config = new ProducerConfig(props);

        producer = new Producer<String, String>(config);
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.println("Please Input Topic and Message Numbers");
        }
        String topic = (String) args[0];
        int count = Integer.parseInt((String) args[1]);
        System.out.println("Topic = " + topic);
        System.out.println("Message Nums = " + count);

        KafkaProducerWithPartition simpleProducer = new KafkaProducerWithPartition();
        simpleProducer.publishMessage(topic, count);
    }

    /**
     * 根據topic和訊息條數傳送訊息
     * @param topic
     * @param count
     */
    private void publishMessage(String topic, int count) {
        Random random = new Random();
        for (int i = 0; i < count; i ++) {
            String runtime = new Date().toString();
            // 訪問的IP地址
            String clientIP = "192.168.1." + random.nextInt(255);
            String msg = runtime + ",kafka.apache.org," + clientIP;
            System.out.println("msg = " + msg);
            /**
             * 第一個泛型指定用於分割槽的key的型別,第二個泛型指message的型別
             * topic只能為String型別
             * 和上一個Producer相比,多了一個用於分割槽的key
             */
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, clientIP, msg);
            producer.send(data);
        }
        producer.close();
    }
}

(2)Partitioner

package ckm.kafka.producer;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by ckm on 2016/8/3.
 */
public class SimplePartitioner implements Partitioner {
    /**
     * 不寫這個方法,會報錯
     * Exception in thread "main" java.lang.NoSuchMethodException: ckm.kafka.producer.SimplePartitioner.<init>(kafka.utils.VerifiableProperties)
     * at java.lang.Class.getConstructor0(Class.java:2892)
     * at java.lang.Class.getConstructor(Class.java:1723)
     * at kafka.utils.Utils$.createObject(Utils.scala:436)
     * at kafka.producer.Producer.<init>(Producer.scala:61)
     * at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
     * at ckm.kafka.producer.KafkaProducerWithPartition.<init>(KafkaProducerWithPartition.java:58)
     * at ckm.kafka.producer.KafkaProducerWithPartition.main(KafkaProducerWithPartition.java:70)
     * @param verifiableProperties
     */
    public SimplePartitioner(VerifiableProperties verifiableProperties) {

    }

    public int partition(Object key, int numPartitions) {
        int partition = 0;
        String partitionKey = (String) key;
        int offset = partitionKey.lastIndexOf('.');
        if (offset > 0) {
            partition = Integer.parseInt(partitionKey.substring(offset + 1)) % numPartitions;
        }
        return partition;
    }
}

2、執行
  由於前面已經啟動了ZooKeeper以及Kafka,這裡直接從建立Topic開始
(1)建立Topic
  建立一個partition為3,replication為3的topic。

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 3 --partitions 3 --topic partition-kafka-producer

這裡寫圖片描述
  如何使用list命令檢視該Topic,可以參考前面的示例
 (2)執行Java程式碼

java -cp KafkaTestProgram.jar ckm.kafka.producer.KafkaProducerWithPartition partition-kafka-producer 100

  往partition-kafka-producer Topic中寫入100條隨機生成的Message。
這裡寫圖片描述
(3)檢視這些Message

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic partition-kafka-producer

這裡寫圖片描述

四、自定義Producer的封裝

  上面兩種自定義的Producer中,其實有很多程式碼是重複性的。接下來對Kafka自定義Producer進行一定的封裝,使其使用和配置更加簡便。
  經過封裝後,producer有關的引數都寫在properties檔案中。
  第二步中的Producer的呼叫方法為:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.publishMessage("test message");

  兩行程式碼就可以將該message傳送到配置的Kafka叢集指定的topic中。

  第三步中的自定義Partitioner的Producer的呼叫方法為:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
Properties producerProperties = kafkaProducerTool.getProducerProperties();
// 如果properties配置檔案中沒有配置該引數的話,手動設定
producerProperties.put("partitioner.class", "SimplePartitioner");
kafkaProducerTool.publishPartitionedMessage("partition-key", "test messate");