ELK叢集-Kafka安裝配置(三)
安裝
kafka:
tar -zxvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0 /home/kafka-2.11
zookeeper:
tar -zxvf zookeeper-3.4.13.tar.gz
mv zookeeper-3.4.13 /home/zookeeper-3.4.13
配置
kafka => server.properties
#每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況 broker.id=0 #192.168.1.199-內網ip,PLAINTEXT協議 advertised.listeners=PLAINTEXT://192.168.1.199:9092 #broker處理訊息的最大執行緒數,一般情況下數量為cpu核數 num.network.threads=2 #broker處理磁碟IO的執行緒數,數值為cpu核數2倍 num.io.threads=8 #socket的傳送緩衝區,socket的調優引數SO_SNDBUFF socket.send.buffer.bytes=1048576 #socket的接受緩衝區,socket的調優引數SO_RCVBUFF socket.receive.buffer.bytes=1048576 #socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋 socket.request.max.bytes=104857600 #kafka資料的存放地址,多個地址的話用逗號分割,多個目錄分佈在不同磁碟上可以提高讀寫效能 /data/kafka-logs-1,/data/kafka-logs-2 log.dirs=/tmp/kafka-logs #每個topic的分割槽個數,若是在topic建立時候沒有指定的話會被topic建立時的指定引數覆蓋 num.partitions=2 #資料檔案保留多長時間, 儲存的最大時間超過這個時間會根據log.cleanup.policy設定資料清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一個達到要求, #都會執行刪除,有2刪除資料檔案方式:按照檔案大小刪除:log.retention.bytes,按照2中不同時間粒度刪除:分別為分鐘,小時 log.retention.hours=168 #topic的分割槽是以一堆segment檔案儲存的,這個控制每個segment的大小,會被topic建立時的指定引數覆蓋 log.segment.bytes=536870912 #檔案大小檢查的週期時間,是否處罰 log.cleanup.policy中設定的策略 log.retention.check.interval.ms=60000 #是否開啟日誌清理 log.cleaner.enable=false #zookeeper叢集的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3 zookeeper.connect=localhost:2181 #ZooKeeper的連線超時時間 zookeeper.connection.timeout.ms=1000000 #ZooKeeper叢集中leader和follower之間的同步實際那 zookeeper.sync.time.ms =2000
zookeeper => zoo.cfg
新增環境變數
vi /etc/profile
新增
ZOOKEEPER_HOME=/home/zookeeper-3.4.13
export ZOOKEEPER_HOME
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$ZOOKEEPER_HOME/lib:
配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit= 10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/zookeeper-3.4.13/data
dataLogDir=/home/zookeeper-3.4.13/logs
# the port at which the clients will connect
clientPort= 2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
配置說明:
tickTime:這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
dataDir:顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。
dataLogDir: log目錄, 同樣可以是任意目錄. 如果沒有設定該引數, 將使用和dataDir相同的設定。
clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。
啟動
進入zookeeper下bin目錄,執行
./zkServer.sh start #啟動
netstat -tunlp|grep 2181 #檢視zookeeper埠
./zkServer.sh stop #停止
進入kafka下bin目錄,執行
./kafka-server-start.sh ../config/server.properties
或
nohup ./kafka-server-start.sh ../config/server.properties & (後臺啟動)
kafka簡單測試
1.建立一個topic
引數資訊:Zookeeper的資訊,Topic的名字,Topic的Partition數,複製因子(複製因子必須小於等於Broker數目)
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.查詢topic列表
./kafka-topics.sh --list --zookeeper localhost:2181
3.Producer建立訊息
啟動時,除了列印SLF4J之外,沒有別的。下面可以直接輸入生產的資料 (生產訊息時不需要指定Partition,Kafka自動做Partition路由,每個Partition都是有Lead Partition和Follower Partitions組成,Lead Partition負責讀寫,而Follower Partitions只做複製,在Lead Partition掛了之後,自動做 Failover )
[[email protected] bin]$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
4.Consumer消費訊息
啟動時,除了列印SLF4J之外,沒有別的 –from-beginning實際上是指定offset的讀取策略。
smallest和largest策略表示Zookeeper上的offset還沒有初始化為正確值時,如何初始化offset的問題?試想,Producer生產了一批訊息到Kafka中,但是Kafka尚未由任何Consumer讀取,而Kafka的Offset是由Consumer進行初始化和賦值的,因此此時的Zookeeper上的offset並沒有預期的0(0表示尚未讀取過),而是一個不正確的隨機數,那麼Consumer來讀取訊息時,是從頭開始讀還是從最大的位置等待Producer建立訊息後再讀取,此時就產生了兩個選擇,smallest表示從頭讀,largest表示從最大位置讀
auto.offset.reset(預設是largest):
What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
參考:1.kafka配置引數說明
2.kafka安裝
3.kafka,server.properties配置