1. 程式人生 > 其它 >初識kafka 之 吞吐量控制

初識kafka 之 吞吐量控制

buffer.memory 緩衝區大小,預設 32M

properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

batch.size 批次大小,預設16k

properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

linger.ms 等待時間,一般為1到100ms,預設0。

properties.put(ProducerConfig.LINGER_MS_CONFIG,1);

compression.type 壓縮型別,預設 none,可配置值 gzip、snappy、lz4 和 zstd

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

示例

package com.lzh.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// 本地idea通過kafka傳送訊息
public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.配置 Properties properties = new Properties(); // 連線叢集 // 給kafka物件新增配置資訊 bootstrap.servers // 生產者連線叢集所需的 broker 地址清單 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092");
// 指定傳送訊息的key和value的序列化型別。一定要寫全類名。 // key,value序列化 key.serializer,value.serializer // key序列化 // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // buffer.memory 緩衝區大小,預設 32M properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); // batch.size 批次大小,預設16k properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); // linger.ms 等待時間,一般為1到100ms,預設0。 properties.put(ProducerConfig.LINGER_MS_CONFIG,1); // compression.type 壓縮型別,預設 none,可配置值 gzip、snappy、lz4 和 zstd properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); // 2.建立 kafka 生產者物件 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 3.傳送資料 // 呼叫 send 方法,傳送訊息 for (int i = 1; i < 5; i++) { // 非同步傳送,無回撥函式 if (i == 1) { kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","非同步傳送,無回撥函式\n開始報數:" )); // 無回撥函式 } // 非同步傳送,有回撥函式 kafkaProducer.send(new ProducerRecord<String, String>("Mytopic", "" + i), new Callback() { // 有回撥函式 public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("主題"+ recordMetadata.topic() +"已非同步傳送訊息到"+ recordMetadata.partition() +"分割槽"); } } }); } // 同步傳送,無回撥函式 get() kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","同步傳送,無回撥函式" )).get(); // 無回撥函式 // 同步傳送,有回撥函式 get() kafkaProducer.send(new ProducerRecord<String, String>("Mytopic", "同步傳送,有回撥函式"), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("主題"+ recordMetadata.topic() +"已同步傳送訊息到"+ recordMetadata.partition() +"分割槽"); } } }).get(); // 4.關閉資源 kafkaProducer.close(); } }
本地idea通過kafka傳送訊息