kafka的安裝及使用方式
阿新 • • 發佈:2021-12-07
介紹:
kafka是高效能的訊息中介軟體,利用zookeeper做分散式協調,實現叢集化擴充套件
關鍵詞:topic,partition,replication,offset
安裝使用:
下載安裝包但是一定要注意版本問題不然springboot程式碼沒有反應,kafka-clients為kafka的broker版本號:
https://spring.io/projects/spring-kafka
低版本使用zookeeper做儲存,高版本使用自身做儲存metadata,命令有所差異。
啟動zookeeper命令:
bin/zookeeper-server-start.sh <-daemon 非互動式啟動> config/zookeeper.properties
修改server.properties配置,複製多個配置檔案:
listeners=PLAINTEXT://192.168.121.132:9092 (--暴露服務,否則連線超時)
broker.id=2
(以下同一個機器需要修改):
port=9094
log.dirs=/tmp/kafka-logs-2
啟動kafka命令:
bin/kafka-server-start.sh <-daemon 非互動式啟動> config/server.properties
停止命令:
bin/kafka-server-stop.sh config/server.properties
建立低版本topic:(replication-factor副本值):
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
查詢topic詳情:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-name
查詢所有topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
修改topic引數配置:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
刪除topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
高版本:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-name
建立生產者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
建立消費者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic topic-name --from-beginning
springboot中使用kafka的配置:
1 #kafka叢集配置 2 spring.kafka.bootstrap-servers=192.168.121.132:9092 3 #重試次數 4 spring.kafka.producer.retries=0 5 #應答級別:多少個分割槽副本備份完成時向生產者傳送ack確認(可選0,1,all/-1) 6 spring.kafka.producer.acks=0 7 #批量大小 8 spring.kafka.producer.batch-size=16384 9 #提交延時 10 spring.kafka.producer.properties.linger.ms=0 11 #當生產端積累的訊息達到batch-size或者接收資訊linger.ms後,生產者就會將訊息提交個kafka 12 #linger.ms為0表示每接收到一條資訊就會提交給kafka,這時候batch-size其實就沒有用了 13 #生產端緩衝區大小 14 spring.kafka.producer.buffer-memory=33554432 15 #kafka提供的序列化和反序列化類 16 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 17 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 18 19 ###############消費者配置########## 20 #預設的消費組id(同一訊息,可以被不同組重複使用) 21 spring.kafka.consumer.properties.group.id=defaultConsumerGroup 22 #是否自動提交offset 23 spring.kafka.consumer.enable-auto-commit=true 24 #提交offset延時(接收到資訊後多久提交offset) 25 spring.kafka.consumer.auto-commit-interval=1000 26 #當kafka中沒有初始offset或offset超出範圍將自動重置offset 27 #earliest:重置為分割槽中最小的offset 28 #latest:重置為分割槽中最新的offset(消費分割槽中新產生的資料) 29 #none:只要有一個分割槽不存在已提交的offset,就丟擲異常 30 spring.kafka.consumer.auto-offset-reset=latest 31 #消費會話超時時間(超過這個時間consumer沒有傳送心跳,就會觸發rebalance操作) 32 spring.kafka.consumer.properties.session.timeout.ms=120000 33 #消費請求超時時間 34 spring.kafka.consumer.properties.request.timeout.ms=180000 35 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 36 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 37 #消費端監聽的topic不存在時,專案啟動會報錯 38 spring.kafka.listener.missing-topics-fatal=false
消費者:
@Component @Slf4j public class KafkaCustomerService { @KafkaListener(topics = "hello-world") //定義此消費者接收topic為“hello-world”的訊息,監聽伺服器上的kafka是否有相關的訊息發過來 //record變數代表訊息本身,可以通過ConsumerRecord<?,?>型別的record變數來列印接收的訊息的各種資訊 public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }
生產者:
@Service @Slf4j public class KafkaProductService { @Autowired KafkaTemplate kafkaTemplate; private void sendMsg(){ kafkaTemplate.send("hello-world","welcom to kafka"); } }