Kafka系列之-自定義Producer
前面已經講到了,在Kafka中,Message是由Producer產生的,Producer產生的Message會發送到Topic的指定Partition中。Producer可以有多種形式,也可以由使用者通過Java,C以及Python語言來自定義。
Kafka中Producer的主要作用和地位如下圖所示,Producer通過獲取某個Topic指定Partition的Leader節點連線到Kafka叢集中,
一、Java Producer API
使用者可以基於Kafka提供的API自定義Producer,在這些API中有幾個主要的類:
1. kafka.javaapi.producer.Producer
類定義:
class Producer[ K,V ](private val underlying: kafka.producer.Producer[K ,V])
UML圖:
2. kafka.producer.ProducerConfig
類定義:
class ProducerConfig private (val props: VerifiableProperties)
extends AsyncProducerConfig with SyncProducerConfigShared
UML圖:
3. kafka.producer.KeyedMessage
類定義:
case class KeyedMessage[ K, V ](val topic: String, val key: K, val partKey: Any , val message: V)
二、自定義簡單的Producer
接下來根據上面的三個類,使用Java程式碼實現一個簡單的Producer向Broker傳送Message。這個Producer會為特定的Topic生成Message併發送到預設的Partition中。
具體程式碼和過程在程式碼和註釋中。
1、Java程式碼
package ckm.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
/**
* 一個簡單的Kafka Producer類,傳入兩個引數:
* topic num
* 設定主題和message條數
*
* 執行過程:
* 1、建立一個topic
* kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic xxxx
* 2、執行本類中的程式碼
* 3、檢視message
* kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx
* kafka
*/
public class SimpleKafkaProducer {
/**
* Producer的兩個泛型,第一個指定Key的型別,第二個指定value的型別
*/
private static Producer<String, String> producer;
public SimpleKafkaProducer() {
Properties props = new Properties();
/**
* 指定producer連線的broker列表
*/
props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");
/**
* 指定message的序列化方法,使用者可以通過實現kafka.serializer.Encoder介面自定義該類
* 預設情況下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化
*/
props.put("serializer.class", "kafka.serializer.StringEncoder");
/**
* 這個引數用於通知broker接收到message後是否向producer傳送確認訊號
* 0 - 表示producer不用等待任何確認訊號,會一直髮送訊息,
* 否則producer進入等待狀態
* -1 - 表示leader狀態的replica需要等待所有in-sync狀態的replica都接收到訊息後才會向producer傳送確認訊號,
* 再次之前producer一直處於等待狀態
*/
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Please Input Topic and Message Numbers");
}
String topic = (String) args[0];
int count = Integer.parseInt((String) args[1]);
System.out.println("Topic = " + topic);
System.out.println("Message Nums = " + count);
SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();
simpleProducer.publishMessage(topic, count);
}
/**
* 根據topic和訊息條數傳送訊息
* @param topic
* @param count
*/
private void publishMessage(String topic, int count) {
for (int i = 0; i < count; i ++) {
String runtime = new Date().toString();
String msg = "Message published time - " + runtime;
System.out.println("msg = " + msg);
/**
* 第一個泛型指定用於分割槽的key的型別,第二個泛型指message的型別
* topic只能為String型別
*/
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
producer.send(data);
}
producer.close();
}
}
2、執行
(1)啟動ZooKeeper
$ZK_HOME/bin/zkServer.sh start
(2)啟動Kafka叢集
cd $KAFKA_HOME
nohup bin/kafka-server-start.sh config/server.properties &
(3)建立測試Topic
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 1 --partition 3 --topic simple-kafka-producer
(4)執行SimpleKafkaProducer 程式碼
執行該程式碼,向simple-kafka-producer Topic傳送10條Message
java -cp KafkaTestProgram.jar ckm.kafka.producer.SimpleKafkaProducer simple-kafka-producer 10
(5)檢視simple-kafka-producer中的Message
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic simple-kafka-producer
三、自定義Partition的Producer
這一節中除了實現Producer之外,還自定義了Message的Partition劃分過程。
在這裡,將會模擬一個網頁訪問日誌生成的過程,每條隨機生成的日誌Message中包含三個部分的資訊:
- 頁面訪問時間戳
- 頁面名稱
- 訪問頁面的IP地址
1、Java程式碼
(1)Producer
package ckm.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
* 一個自定義分割槽的Kafka Producer類,傳入兩個引數:
* topic num
* 設定主題和message條數
*
* 模擬使用者點選日誌,日誌格式為:“時間,網址,IP地址"格式
*
* 自定義分割槽,通過IP地址最後一位與分割槽數求餘,message分散到0~partition - 1這些分割槽中
*
* 執行過程:
* 1、建立一個topic
* kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic xxxx
* 2、執行本類中的程式碼
* 3、檢視message
* kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx
* kafka
*/
public class KafkaProducerWithPartition {
/**
* Producer的兩個泛型,第一個指定Key的型別,第二個指定value的型別
*/
private static Producer<String, String> producer;
public KafkaProducerWithPartition() {
Properties props = new Properties();
/**
* 指定producer連線的broker列表
*/
props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");
/**
* 指定message的序列化方法,使用者可以通過實現kafka.serializer.Encoder介面自定義該類
* 預設情況下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化
*/
props.put("serializer.class", "kafka.serializer.StringEncoder");
/**
* 這個引數用於通知broker接收到message後是否向producer傳送確認訊號
* 0 - 表示producer不用等待任何確認訊號,會一直髮送訊息
* 1 - 表示leader狀態的replica在接收到message後需要向producer傳送一個確認訊號,否則producer進入等待狀態
* -1 - 表示leader狀態的replica需要等待所有in-sync狀態的replica都接收到訊息後才會向producer傳送確認訊號,再次之前producer一直處於等待狀態
*/
props.put("request.required.acks", "1");
/**
* 指定partition類,自定義的分割槽類,繼承自kafka.producer.Partitioner介面
*/
props.put("partitioner.class", "ckm.kafka.producer.SimplePartitioner");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
public static void main(String[] args) {
if (args.length < 2) {
System.out.println("Please Input Topic and Message Numbers");
}
String topic = (String) args[0];
int count = Integer.parseInt((String) args[1]);
System.out.println("Topic = " + topic);
System.out.println("Message Nums = " + count);
KafkaProducerWithPartition simpleProducer = new KafkaProducerWithPartition();
simpleProducer.publishMessage(topic, count);
}
/**
* 根據topic和訊息條數傳送訊息
* @param topic
* @param count
*/
private void publishMessage(String topic, int count) {
Random random = new Random();
for (int i = 0; i < count; i ++) {
String runtime = new Date().toString();
// 訪問的IP地址
String clientIP = "192.168.1." + random.nextInt(255);
String msg = runtime + ",kafka.apache.org," + clientIP;
System.out.println("msg = " + msg);
/**
* 第一個泛型指定用於分割槽的key的型別,第二個泛型指message的型別
* topic只能為String型別
* 和上一個Producer相比,多了一個用於分割槽的key
*/
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, clientIP, msg);
producer.send(data);
}
producer.close();
}
}
(2)Partitioner
package ckm.kafka.producer;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* Created by ckm on 2016/8/3.
*/
public class SimplePartitioner implements Partitioner {
/**
* 不寫這個方法,會報錯
* Exception in thread "main" java.lang.NoSuchMethodException: ckm.kafka.producer.SimplePartitioner.<init>(kafka.utils.VerifiableProperties)
* at java.lang.Class.getConstructor0(Class.java:2892)
* at java.lang.Class.getConstructor(Class.java:1723)
* at kafka.utils.Utils$.createObject(Utils.scala:436)
* at kafka.producer.Producer.<init>(Producer.scala:61)
* at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
* at ckm.kafka.producer.KafkaProducerWithPartition.<init>(KafkaProducerWithPartition.java:58)
* at ckm.kafka.producer.KafkaProducerWithPartition.main(KafkaProducerWithPartition.java:70)
* @param verifiableProperties
*/
public SimplePartitioner(VerifiableProperties verifiableProperties) {
}
public int partition(Object key, int numPartitions) {
int partition = 0;
String partitionKey = (String) key;
int offset = partitionKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt(partitionKey.substring(offset + 1)) % numPartitions;
}
return partition;
}
}
2、執行
由於前面已經啟動了ZooKeeper以及Kafka,這裡直接從建立Topic開始
(1)建立Topic
建立一個partition為3,replication為3的topic。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 3 --partitions 3 --topic partition-kafka-producer
如何使用list命令檢視該Topic,可以參考前面的示例
(2)執行Java程式碼
java -cp KafkaTestProgram.jar ckm.kafka.producer.KafkaProducerWithPartition partition-kafka-producer 100
往partition-kafka-producer Topic中寫入100條隨機生成的Message。
(3)檢視這些Message
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic partition-kafka-producer
四、自定義Producer的封裝
上面兩種自定義的Producer中,其實有很多程式碼是重複性的。接下來對Kafka自定義Producer進行一定的封裝,使其使用和配置更加簡便。
經過封裝後,producer有關的引數都寫在properties檔案中。
第二步中的Producer的呼叫方法為:
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.publishMessage("test message");
兩行程式碼就可以將該message傳送到配置的Kafka叢集指定的topic中。
第三步中的自定義Partitioner的Producer的呼叫方法為:
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
Properties producerProperties = kafkaProducerTool.getProducerProperties();
// 如果properties配置檔案中沒有配置該引數的話,手動設定
producerProperties.put("partitioner.class", "SimplePartitioner");
kafkaProducerTool.publishPartitionedMessage("partition-key", "test messate");