1. 程式人生 > 其它 >kafka的安裝及使用方式

kafka的安裝及使用方式

介紹:
  kafka是高效能的訊息中介軟體,利用zookeeper做分散式協調,實現叢集化擴充套件
  關鍵詞:topic,partition,replication,offset
安裝使用:
  下載安裝包但是一定要注意版本問題不然springboot程式碼沒有反應,kafka-clients為kafka的broker版本號:

https://spring.io/projects/spring-kafka

  低版本使用zookeeper做儲存,高版本使用自身做儲存metadata,命令有所差異。
  啟動zookeeper命令:
    bin/zookeeper-server-start.sh <-daemon 非互動式啟動> config/zookeeper.properties
  修改server.properties配置,複製多個配置檔案:


    listeners=PLAINTEXT://192.168.121.132:9092 (--暴露服務,否則連線超時)
    broker.id=2
    (以下同一個機器需要修改):
    port=9094
    log.dirs=/tmp/kafka-logs-2
  啟動kafka命令:
    bin/kafka-server-start.sh <-daemon 非互動式啟動> config/server.properties
  停止命令:
    bin/kafka-server-stop.sh config/server.properties
  建立低版本topic:(replication-factor副本值):

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
  查詢topic詳情:
    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic-name
  查詢所有topic:
    bin/kafka-topics.sh --list --zookeeper localhost:2181
  修改topic引數配置:
    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
  刪除topic:

    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
  高版本:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic-name
  建立生產者:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
  建立消費者:
    bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic topic-name --from-beginning

springboot中使用kafka的配置:

 1 #kafka叢集配置
 2 spring.kafka.bootstrap-servers=192.168.121.132:9092
 3 #重試次數
 4 spring.kafka.producer.retries=0
 5 #應答級別:多少個分割槽副本備份完成時向生產者傳送ack確認(可選0,1,all/-1 6 spring.kafka.producer.acks=0
 7 #批量大小
 8 spring.kafka.producer.batch-size=16384
 9 #提交延時
10 spring.kafka.producer.properties.linger.ms=0
11 #當生產端積累的訊息達到batch-size或者接收資訊linger.ms後,生產者就會將訊息提交個kafka
12 #linger.ms為0表示每接收到一條資訊就會提交給kafka,這時候batch-size其實就沒有用了
13 #生產端緩衝區大小
14 spring.kafka.producer.buffer-memory=33554432
15 #kafka提供的序列化和反序列化類
16 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
17 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
18 
19 ###############消費者配置##########
20 #預設的消費組id(同一訊息,可以被不同組重複使用)
21 spring.kafka.consumer.properties.group.id=defaultConsumerGroup
22 #是否自動提交offset
23 spring.kafka.consumer.enable-auto-commit=true
24 #提交offset延時(接收到資訊後多久提交offset)
25 spring.kafka.consumer.auto-commit-interval=1000
26 #當kafka中沒有初始offset或offset超出範圍將自動重置offset
27 #earliest:重置為分割槽中最小的offset
28 #latest:重置為分割槽中最新的offset(消費分割槽中新產生的資料)
29 #none:只要有一個分割槽不存在已提交的offset,就丟擲異常
30 spring.kafka.consumer.auto-offset-reset=latest
31 #消費會話超時時間(超過這個時間consumer沒有傳送心跳,就會觸發rebalance操作)
32 spring.kafka.consumer.properties.session.timeout.ms=120000
33 #消費請求超時時間
34 spring.kafka.consumer.properties.request.timeout.ms=180000
35 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
36 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
37 #消費端監聽的topic不存在時,專案啟動會報錯
38 spring.kafka.listener.missing-topics-fatal=false

  消費者:

@Component
@Slf4j
public class KafkaCustomerService {
@KafkaListener(topics = "hello-world")
//定義此消費者接收topic為“hello-world”的訊息,監聽伺服器上的kafka是否有相關的訊息發過來
//record變數代表訊息本身,可以通過ConsumerRecord<?,?>型別的record變數來列印接收的訊息的各種資訊
public void listen (ConsumerRecord<?, ?> record) throws Exception 
  {
         System.out.printf("topic = %s, offset = %d, value = %s \n",     
         record.topic(), record.offset(), record.value());
    }

}

生產者:

@Service
@Slf4j
public class KafkaProductService {
@Autowired
KafkaTemplate kafkaTemplate;
private void sendMsg(){
    kafkaTemplate.send("hello-world","welcom to kafka");
  }
}