Kafka producer端開發程式碼例項
一、producer工作流程
producer使用使用者啟動producer的執行緒,將待發送的訊息封裝到一個ProducerRecord類例項,然後將其序列化之後傳送給partitioner,再由後者確定目標分割槽後一同傳送到位於producer程式中的一塊記憶體緩衝區中。而producer的另外一個執行緒(Sender執行緒)則負責實時從該緩衝區中提取出準備就緒的訊息封裝進一個批次(batch),統一發送給對應的broker,具體流程如下圖:
二、producer示例程式開發
首先引入kafka相關依賴,在pom.xml檔案中加入如下依賴:
<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.2.0</version> </dependency>
在resources下面建立kafka-producer.properties配置檔案,用於設定kafka引數,內容如下:
bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=-1 retries=3 batch.size=323840 linger.ms=10 buffer.memory=33554432 max.block.ms=3000
其中,前三個引數必須明確指定,因為這三個引數沒有預設值(注:kafka的producer引數配置可以參考http://kafka.apache.org/documentation/),然後編寫producer傳送訊息的程式碼:
/** * Kafka傳送訊息測試 * @throws IOException */ public void sendMsg() throws IOException { //1.構造properties物件 Properties properties = new Properties(); FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties"); properties.load(fileInputStream); fileInputStream.close(); //2.構造kafkaProducer物件 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 100; i++) { //3.構造待發送訊息的producerRecord物件,並指定訊息要傳送到哪個topic,訊息的key和value ProducerRecord testTopic = new ProducerRecord("testTopic",Integer.toString(i),Integer.toString(i)); //4.呼叫kafkaProducer物件的send方法傳送訊息 producer.send(testTopic); } //5.關閉kafkaProducer producer.close(); }
然後登陸kafka所在伺服器,執行以下命令監聽訊息:
cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9094 --topic testTopic --from-beginning
執行sendMsg方法,注意觀察消費端,
可以看到有0-99之間的數字依次被消費到,說明訊息傳送成功。
三、非同步和同步傳送訊息
上面傳送訊息的示例程式中,沒有對傳送結果進行處理,如果訊息傳送失敗我們也是無法得知的,這種方法在實際應用中是不推薦的。在實際使用場景中,一般使用非同步和同步兩種常見傳送方式。Java版本producer的send方法會返回一個Future物件,如果呼叫Future.get()方法就會無限等待返回結果,實現同步傳送的效果,否則就是非同步傳送。
1.非同步傳送訊息
Java版本producer的send()方法提供了回撥類引數來實現非同步傳送以及對傳送結果進行的響應,具體程式碼如下:
/** * 非同步傳送訊息 * * @throws IOException */ public void sendMsg() throws IOException { //1.構造properties物件 Properties properties = new Properties(); FileInputStream fileInputStream = new FileInputStream("F:\\javaCode\\jvmdemo\\src\\main\\resources\\kafka-producer.properties"); properties.load(fileInputStream); fileInputStream.close(); //2.構造kafkaProducer物件 KafkaProducer producer = new KafkaProducer(properties); for (int i = 0; i < 100; i++) { //3.構造待發送訊息的producerRecord物件,並指定訊息要傳送到哪個topic,訊息的key和value ProducerRecord testTopic = new ProducerRecord("testTopic",Integer.toString(i)); //4.呼叫kafkaProducer物件的send方法傳送訊息,傳入Callback回撥引數 producer.send(testTopic,new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null == exception) { //訊息傳送成功後的處理 System.out.println("訊息傳送成功"); } else { //訊息傳送失敗後的處理 System.out.println("訊息傳送失敗"); } } }); } //5.關閉kafkaProducer producer.close(); }
以上程式碼中,send方法第二個引數傳入一個匿名內部類物件,也可以傳入實現了org.apache.kafka.clients.producer.Callback介面的類物件。同時onCompletion方法的兩個入參recordMetadata和exception不會同時為空,當訊息傳送成功後,exception為null,訊息傳送失敗後recordMetadata為null。因此可以按照兩個入參進行成功和失敗邏輯的處理。
其次,Kafka傳送訊息失敗的型別包含兩類,可重試異常和不可重試異常。所有的可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類,理論上所有沒有繼承RetriableException 類的其他異常都屬於不可重試異常,鑑於此,可以在訊息傳送失敗後,按照是否可以重試,來進行不同的處理邏輯處理:
//4.呼叫kafkaProducer物件的send方法傳送訊息 producer.send(testTopic,new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null == exception) { //訊息傳送成功後的處理 System.out.println("訊息傳送成功"); } else { if(exception instanceof RetriableException){ // 可重試異常 System.out.println("可重試異常"); }else{ // 不可重試異常 System.out.println("不可重試異常"); } } } });
2.同步傳送訊息
同步傳送和非同步傳送是通過Java的Futrue來區分的,呼叫Future.get()無限等待結果返回,即實現了同步傳送的結果,具體程式碼如下:
// 傳送訊息 Future future = producer.send(testTopic); try { // 呼叫get方法等待結果返回,傳送失敗則會丟擲異常 future.get(); } catch (Exception e) { System.out.println("訊息傳送失敗"); }
四、其他高階特性
1.訊息分割槽機制
kafka producer提供了分割槽策略以及分割槽器(partitioner)用於確定將訊息傳送到指定topic的哪個分割槽中。預設分割槽器根據murmur2演算法計算訊息key的雜湊值,然後對總分割槽數求模確認訊息要被髮送的目標分割槽號(這點讓我想起了redis叢集中key值的分配方法),這樣就確保了相同key的訊息被髮送到相同分割槽。若訊息沒有key值,將採用輪詢的方式確保訊息在topic的所有分割槽上均勻分配。
除了使用kafka預設的分割槽機制,也可以通過實現org.apache.kafka.clients.producer.Partitioner介面來自定義分割槽器,此時需要在構造KafkaProducer的 properties中增加partitioner.class來指明分割槽器實現類,如:partitioner.class=com.demo.service.CustomerPartitioner。
2.訊息序列化
在本篇開始的producer示例程式中,在構造KafkaProducer物件的時候,有兩個配置項
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
分別用於配置訊息key和value的序列化方式為String型別,除此之外,Kafka中還提供瞭如下預設的序列化器:
ByteArraySerializer:本質上什麼也不做,因為網路中傳輸就是以位元組傳輸的;
ByteBufferSerializer:序列化ByteBuffer訊息;
BytesSerializer:序列化kafka自定義的Bytes型別;
IntegerSerializer:序列化Integer型別;
DoubleSerializer:序列化Double型別;
LongSerializer:序列化Long型別;
如果要自定義序列化器,則需要實現org.apache.kafka.common.serialization.Serializer介面,並且將key.serializer和value.serializer配置為自定義的序列化器。
3.訊息壓縮
訊息壓縮可以顯著降低磁碟佔用以及頻寬佔用,從而有效提升I/O密集型應用效能,但是引入壓縮同時會消耗額外的CPU,因此壓縮是I/O效能和CPU資源的平衡。kafka目前支援3種壓縮演算法:CZIP,Snappy和LZ4,效能測試結果顯示三種壓縮演算法的效能如下:LZ4>>Snappy>GZIP,目前啟用LZ4進行訊息壓縮的producer的吞吐量是最高的。
預設情況下Kafka是不壓縮訊息的,但是可以通過在建立KafkaProducer 物件的時候設定producer端引數compression.type來開啟訊息壓縮,如配置compression.type=LZ4。那麼什麼時候開啟壓縮呢?首先判斷是否啟用壓縮的依據是I/O資源消耗與CPU資源消耗的對比,如果環境上I/O資源非常緊張,比如producer程式佔用了大量的網路頻寬或broker端的磁碟佔用率很高,而producer端的CPU資源非常富裕,那麼就可以考慮為producer開啟壓縮。
4.無訊息丟失配置
在使用KafkaProducer.send()方法傳送訊息的時候,其實是把訊息放入緩衝區中,再由一個專屬I/O執行緒負責從緩衝區提取訊息並封裝訊息到batch中,然後再發送出去。如果在I/O執行緒將訊息傳送出去之前,producer奔潰了,那麼所有的訊息都將丟失。同時,存在多訊息傳送時候由於網路抖動導致訊息亂序的問題,為了解決這兩個問題,可以通過在producer端以及broker端進行配置進行避免。
4.1 producer端配置
max.block.ms=3000:設定block的時長,當緩衝區被填滿或者metadata丟失時產生block,停止接收新的訊息;
acks=all:等待所有follower都響應了傳送訊息認為訊息傳送成功;
retries=Integer.MAX_VALUE:設定重試次數,設定一個比較大的值可以保證訊息不丟失;
max.in.flight.requests.per.connection=1:限制producer在單個broker連線上能夠傳送的未響應請求的數量,從而防止同topic統一分割槽下訊息亂序問題;
除了設定以上引數之外,在傳送訊息的時候,應該儘量使用帶有回撥引數的send方法來處理髮送結果,如果資料傳送失敗,則顯示呼叫KafkaProducer.close(0)方法來立即關閉producer,防止訊息亂序。
4.2 broker端配置
unclean.leader.election.enable=false:關閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader;
replication.factor>=3:至少使用3個副本儲存資料;
min.issync.replicas>1:控制某條訊息至少被寫入到ISR中多少個副本才算成功,當且僅當producer端acks引數設定為all或者-1時,該引數才有效。
最後,確保replication.factor>min.issync.replicas,如果兩者相等,那麼只要有一個副本掛掉,分割槽就無法工作,推薦配置replication.factor=min.issync.replicas+1。
關於producer端的開發就介紹到這兒,下一篇將介紹consumer端的開發。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。