1. 程式人生 > >Kafka的安裝和使用

Kafka的安裝和使用

選擇Binary downloads:

解壓進入目錄:

admindeMacBook-Pro:Tools yyc$ cd kafka_2.11-1.1.0/
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ ls
LICENSE		bin		libs
NOTICE		config		site-docs

Step1.啟動zookeeper

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/zookeeper-server-start.sh config/zookeeper.properties 
[2018-07-19 12:24:42,461] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2018-07-19 12:24:42,464] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2018-07-19 12:24:42,464] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)

啟動Kafka:

admindeMacBook-Pro:Tools yyc$ cd kafka_2.11-1.1.0/
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-server-start.sh config/server.properties &
[1] 1604
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ [2018-07-19 12:27:27,288] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

Step2.建立Topic

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[2018-07-19 12:32:17,512] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-0 (kafka.server.ReplicaFetcherManager)
[2018-07-19 12:32:17,543] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Loading producer state from offset 0 with message format version 2 (kafka.log.Log)

檢視建立的Topic:

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Step3.傳送訊息

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>this is a me[2018-07-19 12:37:28,352] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
^R
this is a message
>th[2018-07-19 12:37:36,269] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: test-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
^R
    
>
>this is a new message

接收訊息

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is a message


this is a new message

step4.設定多個叢集

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ cp config/server.properties config/server-1.properties 
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ cp config/server.properties config/server-2.properties 
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ 

配置檔案修改:

config/server-1.properties: 
    broker.id=1 
    listeners=PLAINTEXT://:9093 
    log.dir=/tmp/kafka-logs-1

config/server-2.properties: 
    broker.id=2 
    listeners=PLAINTEXT://:9094 
    log.dir=/tmp/kafka-logs-2

執行:

已經運行了zookeeper和剛才的一個kafka節點,所有我們只需要在啟動2個新的kafka節點。

> bin/kafka-server-start.sh config/server-1.properties &
... 
> bin/kafka-server-start.sh config/server-2.properties &
...

設定新topic,備份為3

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
[2018-07-19 14:06:43,529] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:56030 (org.apache.zookeeper.server.NIOServerCnxnFactory)

檢視每個叢集幹嘛

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
[2018-07-19 14:09:56,044] INFO Accepted socket connection from /127.0.0.1:56137 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2018-07-19 14:09:56,051] INFO Client attempting to establish new session at /127.0.0.1:56137 (org.apache.zookeeper.server.ZooKeeperServer)
[2018-07-19 14:09:56,052] INFO Established session 0x164b12214f90005 with negotiated timeout 30000 for client /127.0.0.1:56137 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
[2018-07-19 14:09:56,262] INFO Processed session termination for sessionid: 0x164b12214f90005 (org.apache.zookeeper.server.PrepRequestProcessor)
[2018-07-19 14:09:56,264] INFO Closed socket connection for client /127.0.0.1:56137 which had sessionid 0x164b12214f90005 (org.apache.zookeeper.server.NIOServerCnxn)
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ 

釋出新訊息到主題上:

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>message 1
>mes[2018-07-19 14:16:42,241] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-07-19 14:16:42,254] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2018-07-19 14:16:42,255] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: my-replicated-topic-0. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
^R  
me
>message2
>^CadmindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ 

檢視訊息:

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

測試容錯:

admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ ps | grep server-1.properties
 4179 ttys001    0:16.61 /usr/bin/java 
 admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ kill -9 4179
admindeMacBook-Pro:kafka_2.11-1.1.0 yyc$ [2018-07-19 14:21:50,164] WARN caught end of stream exception (org.apache.zookeeper.server.NIOServerCnxn)
EndOfStreamException: Unable to read additional data from client sessionid 0x164b12214f90001, likely client has closed socket
	at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
	at java.lang.Thread.run(Thread.java:748)
[2018-07-19 14:21:50,165] INFO Closed socket connection for client /127.0.0.1:55971 which had sessionid 0x164b12214f90001 (org.apache.zookeeper.server.NIOServerCnxn)

[3]-  Killed: 9               bin/kafka-server-start.sh config/server-1.properties

Step5.使用 Kafka Connect 來 匯入/匯出 資料

Step6.使用Kafak處理流資料