1. 程式人生 > >Kafka 2.11-0.2 部署文件

Kafka 2.11-0.2 部署文件


磁碟初始化:
pvcreate /dev/vdb
vgcreate VG0 /dev/vdb
#建立一個佔全部卷組大小的lv
lvcreate -l +100%FREE -n LV0 VG0
mkfs.ext4 /dev/VG0/LV0

echo "/dev/VG0/LV0  /data ext4 defaults              0 0" >>/etc/fstab
#重新掛在磁碟
mount -a
+++++++++++++++++
安裝zookeeper叢集:

1.1 安裝java環境
yum install java-1.8.0-openjdk-devel.x86_64 java-1.8.0-openjdk.x86_64 -y

1.2 安裝zk
mkdir /data/app
wget http://10.69.36.241/soft/zookeeper-3.4.9.tar.gz
tar zxvf zookeeper-3.4.9.tar.gz -C /data/app

1.3 設定環境變數
echo 'export ZOOKEEPER_HOME=/data/app/zookeeper-3.4.9' >>/etc/profile
echo 'export PATH=$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf' >>/etc/profile
source /etc/profile

1.4 修改配置檔案
cd /data/app/zookeeper-3.4.9/conf/
cp /data/app/zookeeper-3.4.9/conf/zoo_sample.cfg /data/app/zookeeper-3.4.9/conf/zoo.cfg
vim zoo.cfg

#心跳時間間隔,也是時間單元
tickTime=2000

#叢集中的follower伺服器(F)與leader伺服器(L)之間 初始連線 時能容忍的最多心跳數(tickTime的數量)
initLimit=10

#leader 與 follower 之間傳送訊息,請求和應答 時間長度。如果 follower 在設定的時間內不能與leader 進行通訊,那麼此 follower 將被丟棄。
syncLimit=5

dataDir=/data/app/zookeeper-3.4.9/var/data
dataLogDir=/data/app/zookeeper-3.4.9/var/datalog

clientPort=2181


server.1=10.69.40.17:2888:3888
server.2=10.69.40.12:2888:3888
server.3=10.69.40.7:2888:3888
server.4=10.69.40.11:2888:3888
server.5=10.69.40.16:2888:3888

# 配置zk日誌自動清理,否則很快會佔滿磁碟
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1




設定日誌目錄
預設是在啟動時的目錄下生成zookeeper.out檔案
vim /data/app/zookeeper-3.4.9/conf/zookeeper-env.sh 

#!/bin/bash	
JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.191.b12-0.el7_5.x86_64
ZOO_LOG_DIR=/data/app/zookeeper-3.4.9/logs
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"




建立配置檔案中dataDir,dataLogDir中的路徑目錄
mkdir -p /data/app/zookeeper-3.4.9/var/data
mkdir -p /data/app/zookeeper-3.4.9/var/datalog

cd /data/app/zookeeper-3.4.9/var/data
vim myid
向其中輸入數字1(對應zoo.cfg檔案server後的數字)


1.5
拷貝zk目錄到其他節點,根據zoo.cfg中對應的值進行更改myid檔案


1.6 啟動zk,需要在每臺機器上執行
zkServer.sh start

啟動後的程序為QuorumPeerMain
可通過zkServer.sh status 命令來檢視zk的狀態,正常是機器中只有一個leader,其他都是follower。


1.7 設定開機啟動zk(未做)
/data/app/zookeeper-3.4.9/bin/zkServer.sh start








++++++++++++++
2 安裝kafka叢集:


2.1 安裝java環境
yum install java-1.8.0-openjdk-devel.x86_64 java-1.8.0-openjdk.x86_64 -y

2.2 安裝kafka:
mkdir /data/app
wget http://10.69.36.241/soft/kafka_2.11-0.11.0.2.tgz
tar zxvf kafka_2.11-0.11.0.2.tgz -C /data/app/

mkdir -p /data/app/kafka_2.11-0.11.0.2/kafka-logs
#建立kafka訊息目錄,主要存放kafka訊息,和logs目錄不一樣,logs目錄主要存放程式啟動之類的日誌。

2.3 設定環境變數
echo 'export KAFKA_HOME=/data/app/kafka_2.11-0.11.0.2' >>/etc/profile
echo 'export PATH=$KAFKA_HOME/bin:$KAFKA_HOME/config:$PATH' >>/etc/profile
source /etc/profile

2.4 修改配置檔案
vim /data/app/kafka_2.11-0.11.0.2/config/server.properties
# 當前機器在叢集中的唯一標識,和zk中的myid性質一樣,每臺機器不同
broker.id=0

listeners=PLAINTEXT://:9092
port=9092

# broker繫結的介面,預設繫結所有interfaces
host.name=10.69.40.14

# 不設定,kafka向zk註冊的時候使用主機名,而不是IP
advertised.host.name=10.69.40.14

# broker進行網路處理的執行緒數。預設3
num.network.threads=3

# broker進行IO處理的執行緒數 ,預設8
num.io.threads=8

# 傳送緩衝區大小,資料在緩衝區到了一定大小後在傳送,能提高效能
socket.send.buffer.bytes=102400

# 接受緩衝區大小,資料到了一定大小後在序列化到磁碟
socket.receive.buffer.bytes=102400

# 向kafka傳送訊息或者向kafka請求訊息時的請求的最大數,不能超過java堆疊大小
socket.request.max.bytes=104857600

# kafka訊息的存放目錄,可以配置多個目錄,以逗號分隔
log.dirs=/data/app/kafka_2.11-0.11.0.2/kafka-logs

# 分割槽數量,儘量是主機數的倍數關係
num.partitions=36

num.recovery.threads.per.data.dir=6

# 預設訊息的最大持久化時間,7天
log.retention.hours=48

# kafka訊息追加到檔案,超過這個值,另新起一個檔案
log.segment.bytes=1073741824

#每隔多少時間檢查log持久化時間
log.retention.check.interval.ms=300000

# 是否啟用log壓縮,(啟用用於清理offset主題)
log.cleaner.enable=true

# 訊息儲存的副本數
default.replication.factor=3
delete.topic.enable=true
auto.create.topics.enable=true
#zk連線
zookeeper.connect=10.69.40.17:2181,10.69.40.12:2181,10.69.40.7:2181,10.69.40.11:2181,10.69.40.16:2181
zookeeper.connection.timeout.ms=6000





kafka啟動指令碼kafka-server-start.sh中指定了kafka啟動時需要的最小記憶體,預設為1G

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"


2.5 啟動kafka
kafka-server-start.sh -daemon /data/app/kafka_2.11-0.11.0.2/config/server.properties
啟動後會有一個kafka的程序

新增開機啟動
echo '/data/app/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /data/app/kafka_2.11-0.11.0.2/config/server.properties'>>/etc/rc.local

-----3.測試---------------------------------------------------------------------------------------------------
kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 9 --replication-factor 3

①在一臺機器上:
kafka-console-producer.sh --broker-list 10.69.69.206:9092,10.69.69.121:9092,10.69.69.142:9092 --sync --topic test
隨意輸入內容
②在另一臺機器上:
kafka-console-consumer.sh --zookeeper 10.69.69.206:2181,10.69.69.121:2181,10.69.69.142:2181 --topic ACCESS_TOPIC_1 --from-beginning

③kafka-topics.sh --zookeeper 10.69.69.206:2181,10.69.69.121:2181,10.69.69.142:2181 --describe

④kafka-topics.sh --zookeeper localhost:2181 --delete  --topic test

⑤kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic test --broker-info --group group1

kafka-consumer-offset-checker.sh --zookeeper node1:2181 node2:2181 node3:2181 --topic mytopic --broker-info --group group1

(09之後的新版本)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.69.40.13:9092 --group lx_test --describe
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 10.69.40.13:9092 --list