1. 程式人生 > >kafka(一)入門

kafka(一)入門

kafka跟activemq,rocketmq類似,也是其中一種訊息中介軟體。

Step1:下載kafka包

https://kafka.apache.org/downloads 下載 kafka_2.11-1.1.0.tgz
>tar -zxvf kafka_2.11-1.1.0.tgz
>cd kafka_2.11-1.1.0

Step2:啟動kafka

kafka需要用到zookeeper,如果沒有zookeeper,可以使用kafka自帶的單節點zookeeper,
zookeeper.properties:
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
kafka自帶的zookeeper預設是2181埠。
啟動自帶zookeeper:
>bin/zookeeper-server-start.sh config/zookeeper.propertie
INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

啟動kafka
先看看kafka的配置檔案server.properties幾個主要的配置項
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
broker.id設定kafka broker的id,以區分不同的broker
num.network.threads為處理broker接收請求和傳送響應的執行緒數
num.io.threads為處理請求,包含需要磁碟I/O操作時的執行緒數
log.dirs指明日誌檔案的路徑
zookeeper.connect配置zookeeper的ip:port
zookeeper.connection.timeout.ms為zookeeper超時時重連的時間(毫秒)

如果要在其他機器呼叫kafka,則需要配置listeners:

listeners = PLAINTEXT://[your.host.name]:9092
啟動
>bin/kafka-server-start.sh config/server.properties

Step3:建立topic

建立只有一個分割槽()和只有一個副本的topic test
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
通過list命令檢視topic
>bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Step4:傳送訊息

kafka可以通過命令列客戶端將訊息傳送給kafka叢集,客戶端可以通過一個檔案或者標準輸入作為訊息輸入來源,預設來說,一個換行就一條訊息。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
first message
second message

Step5:啟動一個客戶端

kafka提供一個命令列客戶端查詢出topic的訊息
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
first message
second message
所有命令列工具都有其他的選項引數,執行命令,如果沒帶引數的情況下,則會有提供命令的詳細使用選項引數。

Step6:建立多broker叢集

通過前面幾個Step,我們已經建立了kafka,不過kafka叢集只有一個節點broker的,下面,我們擴充套件到三個節點。
之前已經建立了一個broker,我們再建立2個broker即可,先來編輯2個broker的配置檔案
server-1-9093.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1-9093
server-2-9094.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2-9094
broker.id是每個節點在叢集中的唯一標識id。
啟動兩個節點:
>bin/kafka-server-start.sh config/server-1-9093.properties &
>bin/kafka-server-start.sh config/server-2-9094.properties &
INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor)
...
INFO Creating /brokers/ids/2 (is it secure? false) (kafka.zk.KafkaZkClient)
...
INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
上面是9094節點啟動的日誌,id=2的節點啟動成功,並註冊為zookeeper的目錄/brokers/ids/2

所以叢集的節點都註冊到zookeeper的/brokers/ids目錄下
[zk: 127.0.0.1:2181(CONNECTED) 13] ls /brokers/ids
[2, 1, 0]
比如brokers-id:0
[zk: 127.0.0.1:2181(CONNECTED) 15] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],
"jmx_port":-1,"host":"localhost","timestamp":"1529319636438","port":9092,"version":4}
建立有3個副本,一個分割槽的topic:my-replicated-topic
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
我們已經建立了一個kafka的叢集,不過怎麼知道叢集的各個節點各自負責什麼職責?可以通過“describe topics”命令得知:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
第一行是所有分割槽的總概況,此外額外第一行描述一個分割槽的資訊,由於分割槽數為1,所以從第二行起只有一行。
"Partition" 第1個分割槽,這裡是從0開始;
"Leader" 對指定的分割槽的所有讀和寫響應,都是該leader(id)處理,選哪個節點作為特定分割槽的Leader隨機的;
"Replicas" 指定分割槽日誌的副本的所有節點列表,這些節點並不區分它們是Leader還是存活的節點;
"Isr" 是“同步”副本的集合,"Isr"是副本的子集,它們當前是存活狀態的並使其中1個作為Leader;

首先發布一些訊息到topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
接著我們消費topic的這些訊息
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
我們現在測試kafka叢集的容錯,當前叢集是broker 1是Leader
ps aux | grep server-1-9093.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
kill -9 7564
我們通過手動停止Leader節點broker 1。

kafka會在存活狀態的從節點中選一個節點將它切換到Leader,由於broker 1已經被停止,不再是存活狀態,所以它不會在"Isr"的集合中。
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
我們看到broker 1停止後,"Replicas"的列表是不變的,但"Isr"的集合中不會再包含1只剩下2和0,所以新的Leader是從2,0的集合中隨機選出來的。


https://kafka.apache.org/quickstart