kafka簡述與叢集配置
一、kafka簡述
1、簡介
kafka是一個高吞吐的分散式訊息佇列系統。特點是生產者消費者模式,先進先出(FIFO)保證順序,自己不丟資料,預設每隔7天清理資料。訊息列隊常見場景:系統之間解耦合、峰值壓力緩衝、非同步通訊。
2、叢集介紹
(1)Kafka架構是由producer(訊息生產者)、consumer(訊息消費者)、borker(kafka叢集的server,負責處理訊息讀、寫請求,儲存訊息,在kafka cluster這一層這裡,其實裡面是有很多個broker)、topic(訊息佇列/分類相當於佇列,裡面有生產者和消費者模型)、zookeeper(元資料資訊存在zookeeper中,包括:儲存消費偏移量,topic話題資訊,partition資訊) 這些部分組成。
(2)kafka裡面的訊息是有topic來組織的,簡單的我們可以想象為一個佇列,一個佇列就是一個topic,然後它把每個topic又分為很多個partition,這個是為了做並行的,在每個partition內部訊息強有序,相當於有序的佇列,其中每個訊息都有個序號offset,比如0到12,從前面讀往後面寫。一個partition對應一個broker,一個broker可以管多個partition,比如說,topic有6個partition,有兩個broker,那每個broker就管3個partition。這個partition可以很簡單想象為一個檔案,當資料發過來的時候它就往這個partition上面append,追加就行,訊息不經過記憶體緩衝,直接寫入檔案,kafka和很多訊息系統不一樣,很多訊息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在kafka裡面沒有一個消費完這麼個概念,只有過期這樣一個概念。
(3)producer自己決定往哪個partition裡面去寫,這裡有一些的策略,譬如如果hash,不用多個partition之間去join資料了。consumer自己維護消費到哪個offset,每個consumer都有對應的group,group內是queue消費模型(各個consumer消費不同的partition,因此一個訊息在group內只消費一次),group間是publish-subscribe消費模型,各個group各自獨立消費,互不影響,因此一個訊息在被每個group消費一次。
3、leader負載均衡機制
當一個broker停止或者crashes時,所有本來將它作為leader的分割槽將會把leader轉移到其他broker上去,極端情況下,會導致同一個leader管理多個分割槽,導致負載不均衡,同時當這個broker重啟時,如果這個broker不再是任何分割槽的leader,kafka的client也不會從這個broker來讀取訊息,從而導致資源的浪費。
kafka中有一個被稱為優先副本(preferred replicas)的概念。如果一個分割槽有3個副本,且這3個副本的優先級別分別為0,1,2,根據優先副本的概念,0會作為leader 。當0節點的broker掛掉時,會啟動1這個節點broker當做leader。當0節點的broker再次啟動後,會自動恢復為此partition的leader。不會導致負載不均衡和資源浪費,這就是leader的均衡機制。
在配置檔案conf/ server.properties中配置開啟(預設就是開啟):
auto.leader.rebalance.enable true
二、叢集配置
1、zookeeper安裝與配置
(1)下載並解壓
在node01 /opt/bigdata/下 解壓 tar -zxvf zookeeper-3.4.6.tar.gz
(2)編輯配置
- 配置hosts vim /etc/hosts
192.168.172.73 node03
192.168.172.72 node02
192.168.172.71 node01
- 配置zookeeper環境變數
export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.6 #zookeeper安裝路徑
export PATH=$ZOOKEEPER_HOME/bin:$PATH
- 配置zoo.cfg
在/opt/bigdata下,複製cp zookeeper-3.4.5/conf/zoo_sample.cfg zookeeper-3.4.5/conf/zoo.cfg
編輯:vim zookeeper-3.4.5/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/bigdata/data/zookeeper/zkdata #zookeeper資料存放路徑
dataLogDir=/opt/bigdata/data/zookeeper/zkdatalog #zookeeper日誌存放路徑
# the port at which the clients will connect
clientPort=2181 ##zookeeper對外通訊埠
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
分別在node01、node02、node03下/opt/bigdata/data/zookeeper/zkdata
vim myid 新建myid檔案,內容分別為1、2、3儲存
(3)節點分發
在node01 /opt/bigdata下 scp遠端複製,分別分發到node02、node03對應目錄下
scp -r zookeeper-3.4.6 node02:`pwd`
scp -r zookeeper-3.4.6 node03:`pwd`
(4)啟動zookeeper叢集
分別在node01、node02、node03下執行 zkServer.sh start命令啟動zookeeper
稍等片刻,分別在node01、node02、node03下執行zkServer.sh status命令,檢視狀態
[[email protected] ~]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: leader
[[email protected] bigdata]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower
[[email protected] ~]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower
3、kafka安裝與配置
(1)下載並解壓
在node01上 /opt/bigdata/下 解壓
tar zxvf kafka_2.11-1.1.0.tgz
(2)編輯配置
在/opt/bigdata/下 vim kafka_2.11-1.1.0/config/server.properties編輯配置
這裡重點修改三個引數broker.id標識本機、log.dirs是kafka接收訊息存放路徑、
zookeeper.connect指定連線的zookeeper叢集地址
其他引數保持預設即可,也可自己根據情況修改
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092
# The port the socket server listens on
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/opt/bigdata/kafka_2.11-1.1.0/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
zookeeper.connect=node01:2181,node02:2181,node03:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
(3)節點分發
在 /opt/bigdata下 遠端複製到node01、node02對應路徑下,當然複製時需要ssh免登入
scp -r kafka_2.11-1.1.0 node02:`pwd`
scp -r kafka_2.11-1.1.0 node03:`pwd`
分別修改server.properties對應的broker.id為2、3即可
(4)啟動kafka叢集
kafka叢集啟動前要啟動zookeeper叢集,若zookeeper叢集沒啟動,首先啟動
在/opt/bigdata下 ,三個節點分別執行如下命令,啟動kafka叢集
./kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-1.1.0/config/server.properties &
(5)基本操作
1)、建立topic
./kafka_2.11/bin/kafka-topics.sh --create --zookeeper node02:2181,node03:2181,node04:2181 --replication-factor 2 --partitions 2 --topic kfk_test
2)、列出建立的topic
./kafka_2.11/bin/kafka-topics.sh --list --zookeeper node02:2181,node03:2181,node04:2181
3)、生成資料
./kafka_2.11/bin/kafka-console-producer.sh -broker-list node02:9092,node03:9092,node04:9092 --topic kfk_test
4)、消費生產資料
kafka 0.9版本之前用zookeeper
./kafka_2.11/bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning --topic kfk_test
kafka 0.9版本之後不推薦zookeeper方式,仍然支援,但逐漸會被取消,推薦bootstrap-server方式
./kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic kfk_test
5)、檢視指定topic資訊
./kafka_2.11/bin/kafka-topics.sh --describe --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test
資訊如下:
Topic:kfk_test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: kfk_test Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: kfk_test Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
可以看到2個分割槽,2個副本
partiton: partion id
leader:當前負責讀寫的lead broker id ,就是server.properties的broker.id
replicas:當前partition的所有replication broker list
isr:relicas的子集,只包含出於活動狀態的broker