kafka(一)入門
阿新 • • 發佈:2019-02-19
kafka跟activemq,rocketmq類似,也是其中一種訊息中介軟體。
Step1:下載kafka包
https://kafka.apache.org/downloads 下載 kafka_2.11-1.1.0.tgz>tar -zxvf kafka_2.11-1.1.0.tgz
>cd kafka_2.11-1.1.0
Step2:啟動kafka
kafka需要用到zookeeper,如果沒有zookeeper,可以使用kafka自帶的單節點zookeeper,zookeeper.properties:
kafka自帶的zookeeper預設是2181埠。dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0
啟動自帶zookeeper:
>bin/zookeeper-server-start.sh config/zookeeper.propertie
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)啟動kafka
先看看kafka的配置檔案server.properties幾個主要的配置項
broker.id設定kafka broker的id,以區分不同的broker# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # A comma separated list of directories under which to store log files log.dirs=/tmp/kafka-logs zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
num.network.threads為處理broker接收請求和傳送響應的執行緒數
num.io.threads為處理請求,包含需要磁碟I/O操作時的執行緒數
log.dirs指明日誌檔案的路徑
zookeeper.connect配置zookeeper的ip:port
zookeeper.connection.timeout.ms為zookeeper超時時重連的時間(毫秒)
如果要在其他機器呼叫kafka,則需要配置listeners:
listeners = PLAINTEXT://[your.host.name]:9092
啟動>bin/kafka-server-start.sh config/server.properties
Step3:建立topic
建立只有一個分割槽()和只有一個副本的topic test>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
通過list命令檢視topic>bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Step4:傳送訊息
kafka可以通過命令列客戶端將訊息傳送給kafka叢集,客戶端可以通過一個檔案或者標準輸入作為訊息輸入來源,預設來說,一個換行就一條訊息。>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
first message
second message
Step5:啟動一個客戶端
kafka提供一個命令列客戶端查詢出topic的訊息>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
first message
second message
所有命令列工具都有其他的選項引數,執行命令,如果沒帶引數的情況下,則會有提供命令的詳細使用選項引數。Step6:建立多broker叢集
通過前面幾個Step,我們已經建立了kafka,不過kafka叢集只有一個節點broker的,下面,我們擴充套件到三個節點。之前已經建立了一個broker,我們再建立2個broker即可,先來編輯2個broker的配置檔案
server-1-9093.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1-9093
server-2-9094.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2-9094
broker.id是每個節點在叢集中的唯一標識id。啟動兩個節點:
>bin/kafka-server-start.sh config/server-1-9093.properties &
>bin/kafka-server-start.sh config/server-2-9094.properties &
INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor)...
INFO Creating /brokers/ids/2 (is it secure? false) (kafka.zk.KafkaZkClient)
...
INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
上面是9094節點啟動的日誌,id=2的節點啟動成功,並註冊為zookeeper的目錄/brokers/ids/2
所以叢集的節點都註冊到zookeeper的/brokers/ids目錄下
[zk: 127.0.0.1:2181(CONNECTED) 13] ls /brokers/ids
[2, 1, 0]
比如brokers-id:0[zk: 127.0.0.1:2181(CONNECTED) 15] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],
"jmx_port":-1,"host":"localhost","timestamp":"1529319636438","port":9092,"version":4}
建立有3個副本,一個分割槽的topic:my-replicated-topic>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
我們已經建立了一個kafka的叢集,不過怎麼知道叢集的各個節點各自負責什麼職責?可以通過“describe topics”命令得知:>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行是所有分割槽的總概況,此外額外第一行描述一個分割槽的資訊,由於分割槽數為1,所以從第二行起只有一行。"Partition" 第1個分割槽,這裡是從0開始;
"Leader" 對指定的分割槽的所有讀和寫響應,都是該leader(id)處理,選哪個節點作為特定分割槽的Leader隨機的;
"Replicas" 指定分割槽日誌的副本的所有節點列表,這些節點並不區分它們是Leader還是存活的節點;
"Isr" 是“同步”副本的集合,"Isr"是副本的子集,它們當前是存活狀態的並使其中1個作為Leader;
首先發布一些訊息到topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
接著我們消費topic的這些訊息>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
我們現在測試kafka叢集的容錯,當前叢集是broker 1是Leaderps aux | grep server-1-9093.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
我們通過手動停止Leader節點broker 1。kafka會在存活狀態的從節點中選一個節點將它切換到Leader,由於broker 1已經被停止,不再是存活狀態,所以它不會在"Isr"的集合中。
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
我們看到broker 1停止後,"Replicas"的列表是不變的,但"Isr"的集合中不會再包含1只剩下2和0,所以新的Leader是從2,0的集合中隨機選出來的。https://kafka.apache.org/quickstart