kafka搭建以及server-properties配置說明
Kafka簡介
由Scala和Java編寫,Kafka是一種高吞吐量的分散式釋出訂閱訊息系統.
術語介紹
- Broker : Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
- Topic : 每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)
- Partition : Partition是物理上的概念,每個Topic包含一個或多個Partition.
- Producer : 負責釋出訊息到Kafka broker
- Consumer : 訊息消費者,向Kafka broker讀取訊息的客戶端。
- Consumer Group : 每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。
消費模式
為了照顧對MQ不是很瞭解的同學,先講一下MQ的原理.一般MQ都是在服務端儲存一個佇列.生產者把訊息丟到MQ server,消費者從MQ server消費.這樣一來解決了生產者和消費者的高耦合問題,同時也解決了生產速度和消費速度差異導致的消費者跟不上生產者的生產速度而導致的消費者壓力過大問題.
在kafka中的topic就是一系列佇列的總稱,稱為一個主題.當然ActiveMQ和RabbitMQ中都有這個概念.一類訊息都會丟到一個topic中去.
講完topic我們講一下partition(分割槽),這個東西是kafka獨有的東西,也是kafka實現橫向擴充套件和高併發的一個重要設計.我們試想一下,如果每個topic只有一個佇列,隨著業務增加topic裡訊息越來越多.多到一臺server裝不下了怎麼辦.為了解決這個問題,我們引入了partition這個概念.一個partition(分割槽)代表了一個物理上存在的佇列.topic只是一組partition(分割槽)的總稱,也就是說topic僅是邏輯上的概念.這樣一來當topic上的訊息越來越多.我們就可以將新增的partition(分割槽)放在其他server上.也就是說topic裡邊的partition(分割槽)可以分屬於不同的機器.實際生產中,也基本都是這樣玩的.
這裡說一個特殊情況,有時我們建立了一個topic沒有指定partition(分割槽)數量或者指定了partition(分割槽)數量為1,這時實際也是有一個預設的partition(分割槽)的,名字我忘記了.
從Producer(生產者)角度,一個訊息丟到topic中任務就完成了.至於具體丟到了topic中的哪個partition(分割槽),Producer(生產者)不需要關注.這裡kafka自動幫助我們做了負載均衡.當然如果我們指定某個partition(分割槽)也是可以的.這個大家官方文件和百度.
接下里我們講Consumer Group(消費組),Consumer Group(消費組)顧名思義就是一組Consumer(消費者)的總稱.那有了組的概念以後能起到什麼作用.如果只有一組內且組內只有一個Consumer,那這個就是傳統的點對點模式,如果有多組,每組內都有一個Consumer,那這個就是釋出-訂閱(pub-sub)模式.每組都會收到同樣的訊息.
最後講最難理解也是大家討論最多的地方,partition(分割槽)和Consumer(消費者)的關係.首先,一個Consumer(消費者)的一個執行緒在某個時刻只能接收一個partition(分割槽)的資料,一個partition(分割槽)某個時刻也只會把訊息發給一個Consumer(消費者).我們設計出來幾種場景:
場景一: topic-1 下有partition-1和partition-2
group-1 下有consumer-1和consumer-2和consumer-3
所有consumer只有一個執行緒,且都消費topic-1的訊息.
消費情況 : consumer-1只消費partition-1的資料
consumer-2只消費partition-2的資料
consumer-3不會消費到任何資料
原因 : 只能接受一個partition(分割槽)的資料
場景二: topic-1 下有partition-1和partition-2
group-1 下有consumer-1
consumer只有一個執行緒,且消費topic-1的訊息.
消費情況 : consumer-1先消費partition-1的資料
consumer-1消費完partition-1資料後開始消費partition-2的資料
原因 : 這裡是kafka檢測到當前consumer-1消費完partition-1處於空閒狀態,自動幫我做了負載.所以大家看到這裡在看一下上邊那句話的”某個時刻”
特例: consumer在消費訊息時必須指定topic,可以不指定partition,場景二的情況就是發生在不指定partition的情況下,如果consumer-1指定了partition-1,那麼consumer-1消費完partition-1後哪怕處於空閒狀態了也是不會消費partition-2的訊息的.
進而我們總結出了一條經驗,同組內的消費者(單執行緒消費)數量不應多於topic下的partition(分割槽)數量,不然就會出有消費者空閒的狀態,此時併發執行緒數=partition(分割槽)數量.反之消費者數量少於topic下的partition(分割槽)數量也是不理想的,原因是此時併發執行緒數=消費者數量,並不能完全發揮kafka併發效率.
最後我們看下上邊的圖,Consumer Group A的兩個機器分別開啟兩個執行緒消費P0 P1 P2 P3的訊息Consumer Group B的四臺機器單執行緒消費P0 P1 P2 P3的訊息就可以了.此時效率最高.
搭建Kafka環境
安裝Kafka
下載:http://kafka.apache.org/downloads.html
tar zxf kafka-<VERSION>.tgz
cd kafka-<VERSION>
啟動Zookeeper
啟動Zookeeper前需要配置一下config/zookeeper.properties:
接下來啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動Kafka Server
啟動Kafka Server前需要配置一下config/server.properties。主要配置以下幾項,內容就不說了,註釋裡都很詳細:
然後啟動Kafka Server:
bin/kafka-server-start.sh config/server.properties
建立Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視建立的Topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動控制檯Producer,向Kafka傳送訊息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
^C
啟動控制檯Consumer,消費剛剛傳送的訊息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
刪除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
注:只有當delete.topic.enable=true時,該操作才有效
配置Kafka叢集(單臺機器上)
首先拷貝server.properties檔案為多份(這裡演示4個節點的Kafka叢集,因此還需要拷貝3份配置檔案):
cp config/server.properties config/server1.properties
cp config/server.properties config/server2.properties
cp config/server.properties config/server3.properties
修改server1.properties的以下內容:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
同理修改server2.properties和server3.properties的這些內容,並保持所有配置檔案的zookeeper.connect屬性都指向執行在本機的zookeeper地址localhost:2181。注意,由於這幾個Kafka節點都將執行在同一臺機器上,因此需要保證這幾個值不同,這裡以累加的方式處理。例如在server2.properties上:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
把server3.properties也配置好以後,依次啟動這些節點:
bin/kafka-server-start.sh config/server1.properties &
bin/kafka-server-start.sh config/server2.properties &
bin/kafka-server-start.sh config/server3.properties &
Topic & Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個資料夾,該資料夾下儲存這個Partition的所有訊息和索引檔案。
現在在Kafka叢集上建立備份因子為3,分割槽數為4的Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic kafka
說明:備份因子replication-factor越大,則說明叢集容錯性越強,就是當叢集down掉後,資料恢復的可能性越大。所有的分割槽數裡的內容共同組成了一份資料,分割槽數partions越大,則該topic的訊息就越分散,叢集中的訊息分佈就越均勻。
然後使用kafka-topics.sh的--describe引數檢視一下Topic為kafka的詳情:
輸出的第一行是所有分割槽的概要,接下來的每一行是一個分割槽的描述。可以看到Topic為kafka的訊息,PartionCount=4,ReplicationFactor=3正是我們建立時指定的分割槽數和備份因子。
另外:Leader是指負責這個分割槽所有讀寫的節點;Replicas是指這個分割槽所在的所有節點(不論它是否活著);ISR是Replicas的子集,代表存有這個分割槽資訊而且當前活著的節點。
拿partition:0這個分割槽來說,該分割槽的Leader是server0,分佈在id為0,1,2這三個節點上,而且這三個節點都活著。
再來看下Kafka叢集的日誌:
其中kafka-logs-0代表server0的日誌,kafka-logs-1代表server1的日誌,以此類推。
從上面的配置可知,id為0,1,2,3的節點分別對應server0, server1, server2, server3。而上例中的partition:0分佈在id為0, 1, 2這三個節點上,因此可以在server0, server1, server2這三個節點上看到有kafka-0這個資料夾。這個kafka-0就代表Topic為kafka的partion0。
kafka server-properties配置說明
broker.id =0
每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況
log.dirs=/data/kafka-logs
kafka資料的存放地址,多個地址的話用逗號分割 /data/kafka-logs-1,/data/kafka-logs-2
port =9092
broker server服務埠
message.max.bytes =6525000
表示訊息體的最大大小,單位是位元組
num.network.threads =4
broker處理訊息的最大執行緒數,一般情況下不需要去修改
num.io.threads =8
broker處理磁碟IO的執行緒數,數值應該大於你的硬碟數
background.threads =4
一些後臺任務處理的執行緒數,例如過期訊息檔案的刪除等,一般情況下不需要去做修改
queued.max.requests =500
等待IO執行緒處理的請求佇列最大數,若是等待IO的請求超過這個數值,那麼會停止接受外部訊息,應該是一種自我保護機制。
host.name
broker的主機地址,若是設定了,那麼會繫結到這個地址上,若是沒有,會繫結到所有的介面上,並將其中之一發送到ZK,一般不設定
socket.send.buffer.bytes=100*1024
socket的傳送緩衝區,socket的調優引數SO_SNDBUFF
socket.receive.buffer.bytes =100*1024
socket的接受緩衝區,socket的調優引數SO_RCVBUFF
socket.request.max.bytes =100*1024*1024
socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋
log.segment.bytes =1024*1024*1024
topic的分割槽是以一堆segment檔案儲存的,這個控制每個segment的大小,會被topic建立時的指定引數覆蓋
log.roll.hours =24*7
這個引數會在日誌segment沒有達到log.segment.bytes設定的大小,也會強制新建一個segment會被 topic建立時的指定引數覆蓋
log.cleanup.policy = delete
日誌清理策略選擇有:delete和compact主要針對過期資料的處理,或是日誌檔案達到限制的額度,會被 topic建立時的指定引數覆蓋
log.retention.minutes=60*24 # 一天後刪除
資料儲存的最大時間超過這個時間會根據log.cleanup.policy設定的策略處理資料,也就是消費端能夠多久去消費資料
log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic建立時的指定引數覆蓋
log.retention.bytes=-1
topic每個分割槽的最大檔案大小,一個topic的大小限制 = 分割槽數*log.retention.bytes。-1沒有大小限log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic建立時的指定引數覆蓋
log.retention.check.interval.ms=5minutes
檔案大小檢查的週期時間,是否處罰 log.cleanup.policy中設定的策略
log.cleaner.enable=false
是否開啟日誌壓縮
log.cleaner.threads = 2
日誌壓縮執行的執行緒數
log.cleaner.io.max.bytes.per.second=None
日誌壓縮時候處理的最大大小
log.cleaner.dedupe.buffer.size=500*1024*1024
日誌壓縮去重時候的快取空間,在空間允許的情況下,越大越好
log.cleaner.io.buffer.size=512*1024
日誌清理時候用到的IO塊大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
日誌清理中hash表的擴大因子一般不需要修改
log.cleaner.backoff.ms =15000
檢查是否處罰日誌清理的間隔
log.cleaner.min.cleanable.ratio=0.5
日誌清理的頻率控制,越大意味著更高效的清理,同時會存在一些空間上的浪費,會被topic建立時的指定引數覆蓋
log.cleaner.delete.retention.ms =1day
對於壓縮的日誌保留的最長時間,也是客戶端消費訊息的最長時間,同log.retention.minutes的區別在於一個控制未壓縮資料,一個控制壓縮後的資料。會被topic建立時的指定引數覆蓋
log.index.size.max.bytes =10*1024*1024
對於segment日誌的索引檔案大小限制,會被topic建立時的指定引數覆蓋
log.index.interval.bytes =4096
當執行一個fetch操作後,需要一定的空間來掃描最近的offset大小,設定越大,代表掃描速度越快,但是也更好記憶體,一般情況下不需要搭理這個引數
log.flush.interval.messages=None
log檔案”sync”到磁碟之前累積的訊息條數,因為磁碟IO操作是一個慢操作,但又是一個”資料可靠性"的必要手段,所以此引數的設定,需要在"資料可靠性"與"效能"之間做必要的權衡.如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞),如果此值過小,將會導致"fsync"的次數較多,這也意味著整體的client請求有一定的延遲.物理server故障,將會導致沒有fsync的訊息丟失.
log.flush.scheduler.interval.ms =3000
檢查是否需要固化到硬碟的時間間隔
log.flush.interval.ms = None
僅僅通過interval來控制訊息的磁碟寫入時機,是不足的.此引數用於控制"fsync"的時間間隔,如果訊息量始終沒有達到閥值,但是離上一次磁碟同步的時間間隔達到閥值,也將觸發.
log.delete.delay.ms =60000
檔案在索引中清除後保留的時間一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
控制上次固化硬碟的時間點,以便於資料恢復一般不需要去修改
auto.create.topics.enable =true
是否允許自動建立topic,若是false,就需要通過命令建立topic
default.replication.factor =1
是否允許自動建立topic,若是false,就需要通過命令建立topic
num.partitions =1
每個topic的分割槽個數,若是在topic建立時候沒有指定的話會被topic建立時的指定引數覆蓋
以下是kafka中Leader,replicas配置引數
controller.socket.timeout.ms =30000
partition leader與replicas之間通訊時,socket的超時時間
controller.message.queue.size=10
partition leader與replicas資料同步時,訊息的佇列尺寸
replica.lag.time.max.ms =10000
replicas響應partition leader的最長等待時間,若是超過這個時間,就將replicas列入ISR(in-sync replicas),並認為它是死的,不會再加入管理中
replica.lag.max.messages =4000
如果follower落後與leader太多,將會認為此follower[或者說partition relicas]已經失效
##通常,在follower與leader通訊時,因為網路延遲或者連結斷開,總會導致replicas中訊息同步滯後
##如果訊息之後太多,leader將認為此follower網路延遲較大或者訊息吞吐能力有限,將會把此replicas遷移
##到其他follower中.
##在broker數量較少,或者網路不足的環境中,建議提高此值.
replica.socket.timeout.ms=30*1000
follower與leader之間的socket超時時間
replica.socket.receive.buffer.bytes=64*1024
leader複製時候的socket快取大小
replica.fetch.max.bytes =1024*1024
replicas每次獲取資料的最大大小
replica.fetch.wait.max.ms =500
replicas同leader之間通訊的最大等待時間,失敗了會重試
replica.fetch.min.bytes =1
fetch的最小資料尺寸,如果leader中尚未同步的資料不足此值,將會阻塞,直到滿足條件
num.replica.fetchers=1
leader進行復制的執行緒數,增大這個數值會增加follower的IO
replica.high.watermark.checkpoint.interval.ms =5000
每個replica檢查是否將最高水位進行固化的頻率
controlled.shutdown.enable =false
是否允許控制器關閉broker ,若是設定為true,會關閉所有在這個broker上的leader,並轉移到其他broker
controlled.shutdown.max.retries =3
控制器關閉的嘗試次數
controlled.shutdown.retry.backoff.ms =5000
每次關閉嘗試的時間間隔
leader.imbalance.per.broker.percentage =10
leader的不平衡比例,若是超過這個數值,會對分割槽進行重新的平衡
leader.imbalance.check.interval.seconds =300
檢查leader是否不平衡的時間間隔
offset.metadata.max.bytes
客戶端保留offset資訊的最大空間大小
kafka中zookeeper引數配置
zookeeper.connect = localhost:2181
zookeeper叢集的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=6000
ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反映,那麼認為已經死了,不易過大
zookeeper.connection.timeout.ms =6000
ZooKeeper的連線超時時間
zookeeper.sync.time.ms =2000