kafka叢集搭建以及遇到的異常
1、kafka下載
官網地址:http://kafka.apache.org/
2、我的環境
Red Hat Linux 7.0、jdk1.8.0_144、zookeeper-3.4.10、kafka_2.12-0.11、伺服器3臺(虛擬機器)
3、安裝kafka
a、下載kafka_2.12-0.11壓縮包,解壓到指定目錄
b、修改配置檔案server.properties(以節點:192.168.34.128為例),配置檔案都有詳細英文說明
broker.id=128 #節點的ID,必須與其它節點不同 delete.topic.enable=true listeners=PLAINTEXT://:9092 #監聽埠 advertised.listeners=PLAINTEXT://192.168.34.128:9092 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 #接受網路請求的執行緒數 num.io.threads=8 #進行磁碟IO的執行緒數 socket.send.buffer.bytes=102400 #套接字伺服器使用的傳送緩衝區大小 socket.receive.buffer.bytes=102400 #套接字伺服器使用的接收緩衝區大小 socket.request.max.bytes=104857600 #單個請求最大能接收的資料量 log.dirs=/app/soft/data/kafka_data #指定儲存日誌檔案目錄 num.partitions=6 # 每個主題的日誌分割槽的預設數量 num.recovery.threads.per.data.dir=1 # 每個資料目錄中的執行緒數,用於在啟動時日誌恢復,並在關閉時重新整理。 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.flush.interval.messages=10000 # 在強制重新整理資料到磁碟執勤啊允許接收訊息的數量 log.flush.interval.ms=1000 # 在強制重新整理之前,訊息可以在日誌中停留的最長時間 log.retention.hours=168 # 一個日誌的最小存活時間,可以被刪除 log.segment.bytes=1073741824 #指定資料分割的檔案大小 log.retention.check.interval.ms=300000 # 檢查日誌段的時間間隔,看是否可以根據保留策略刪除它們 zookeeper.connect=192.168.34.128:2181,192.168.34.129:2181,192.168.34.130:2181 這是一個以逗號為分割的部分,每一個都匹配一個Zookeeper zookeeper.connection.timeout.ms=6000 #連線zk的超時時間 group.initial.rebalance.delay.ms=0
其它兩個節點broker.id、listeners、advertised.listeners修改對應的值即可,這裡就不一一列舉了!!!
c、啟動kafka叢集
nohup bin/kafka-server-start.sh config/server.properties > /dev/null &
3臺伺服器分別執行這個命令就可以啟動kafka了
d、啟動過程中可能出現的異常
Java HotSpot(TM) 64-Bit Server VM warning: INFO:os::commit_memory(0x00000006fff80000, 2863661056, 0) failed; error=’Cannotallocate memory’ (errno=12
原因就是記憶體不夠引起的,修改kafka bin目錄下的 kafka-server-start.sh,調整下面這三個值即可,我這裡是虛擬機器,公司的伺服器不方便給大家看,總之根據自己伺服器的記憶體情況設定即可!!!
1、 -Xms :表示java虛擬機器堆區記憶體初始記憶體分配的大小,通常為作業系統可用記憶體的1/64大小即可,但仍需按照實際情況進行分配。有可能真的按照這樣的一個規則分配時,設計出的軟體還沒有能夠執行得起來就掛了。 2、 -Xmx: 表示java虛擬機器堆區記憶體可被分配的最大上限,通常為作業系統可用記憶體的1/4大小。但是開發過程中,通常會將 -Xms 與 -Xmx兩個引數的配置相同的值,其目的是為了能夠在java垃圾回收機制清理完堆區後不需要重新分隔計算堆區的大小而浪費資源。 3、 -XX:newSize:表示新生代初始記憶體的大小,應該小於 -Xms的值; 4、 -XX:MaxnewSize:表示新生代可被分配的記憶體的最大上限;當然這個值應該小於 -Xmx的值; 5、 -Xmn:至於這個引數則是對 -XX:newSize、-XX:MaxnewSize兩個引數的同時配置,也就是說如果通過-Xmn來配置新生代的記憶體大小,那麼-XX:newSize = -XX:MaxnewSize = -Xmn,雖然會很方便,但需要注意的是這個引數是在JDK1.4版本以後才使用的。
e、 傳送訊息出現這樣的異常:Failed to send messages after 3tries.
Exception in thread"main" kafka.common.FailedToSendMessageException: Failed to sendmessages after 3 tries.
atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
atkafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
atcom.tuan55.kafka.test.TestP.main(TestP.java:20)
原因就是kafka服務可能掛了,可以檢視kafka的監聽埠,netstat -nltp | grep 9092 或者jps命令都可以
f、啟動kafka時,發現zookeeper不能啟動了,刪除zookeeper_server.pid、zookeeper.out再重啟,如果還不能啟動,再刪除versio-2繼續重啟
g、解決掉這些問題,kafka就可以用了,kafka的命令以及程式碼中的簡單使用我在上期已經寫了,安裝就這些了,都是自己的個人總結,歡迎大家來吐槽,我會及時修改內容,謝謝!!!