Kafka的安裝和使用
阿新 • • 發佈:2019-01-04
選擇Binary downloads:
解壓進入目錄:
admindeMacBook-Pro:Tools yyc$ cd kafka_2.11-1.1.0/
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ ls
LICENSE bin libs
NOTICE config site-docs
Step1.啟動zookeeper
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/zookeeper-server-start.sh config/zookeeper.properties [2018-07-19 12:24:42,461] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2018-07-19 12:24:42,464] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2018-07-19 12:24:42,464] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
啟動Kafka:
admindeMacBook-Pro:Tools yyc$ cd kafka_2.11-1.1.0/ admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-server-start.sh config/server.properties & [1] 1604 admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ [2018-07-19 12:27:27,288] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
Step2.建立Topic
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". [2018-07-19 12:32:17,512] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager) [2018-07-19 12:32:17,543] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Loading producer state from offset 0 with message format version 2 (kafka.log.Log)
檢視建立的Topic:
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Step3.傳送訊息
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a me[2018-07-19 12:37:28,352] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
^R
this is a message
>th[2018-07-19 12:37:36,269] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: test-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
^R
>
>this is a new message
接收訊息
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is a message
this is a new message
step4.設定多個叢集
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ cp config/server.properties config/server-1.properties
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ cp config/server.properties config/server-2.properties
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$
配置檔案修改:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
執行:
已經運行了zookeeper和剛才的一個kafka節點,所有我們只需要在啟動2個新的kafka節點。
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
設定新topic,備份為3
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
[2018-07-19 14:06:43,529] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:56030 (org.apache.zookeeper.server.NIOServerCnxnFactory)
檢視每個叢集幹嘛
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
[2018-07-19 14:09:56,044] INFO Accepted socket connection from /127.0.0.1:56137 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2018-07-19 14:09:56,051] INFO Client attempting to establish new session at /127.0.0.1:56137 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-19 14:09:56,052] INFO Established session 0x164b12214f90005 with negotiated timeout 30000 for client /127.0.0.1:56137 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
[2018-07-19 14:09:56,262] INFO Processed session termination for sessionid: 0x164b12214f90005 (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-07-19 14:09:56,264] INFO Closed socket connection for client /127.0.0.1:56137 which had sessionid 0x164b12214f90005 (org.apache.zookeeper.server.NIOServerCnxn)
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$
釋出新訊息到主題上:
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>message 1
>mes[2018-07-19 14:16:42,241] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-07-19 14:16:42,254] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-07-19 14:16:42,255] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
^R
me
>message2
>^CadmindeMacBook-Pro:kafka_2.11-1.1.0 yyc$
檢視訊息:
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
測試容錯:
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ ps | grep server-1.properties
4179 ttys001 0:16.61 /usr/bin/java
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ kill -9 4179
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ [2018-07-19 14:21:50,164] WARN caught end of stream exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid 0x164b12214f90001, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:748)
[2018-07-19 14:21:50,165] INFO Closed socket connection for client /127.0.0.1:55971 which had sessionid 0x164b12214f90001 (org.apache.zookeeper.server.NIOServerCnxn)
[3]- Killed: 9 bin/kafka-server-start.sh config/server-1.properties