1. 程式人生 > >5 kafka叢集搭建

5 kafka叢集搭建

kafka叢集搭建

安裝scala

  • 下載scala
  • 將下載的scala包解壓縮到/usr/local資料夾下
  • 修改scala資料夾名字為scala
  • 配置環境變數
  • 按照上述步驟在spark2spark3機器上都安裝好scala。使用scpscala.bashrc拷貝到spark2spark3上即可。

安裝kafka

  • 下載kafka
  • 將下載的kafka包解壓縮到/usr/local資料夾下
  • 修改kafka資料夾名字為kafka
  • 配置kafka
vi /usr/local/kafka/config/server.properties

broker.id:# 依次增長的整數,0、1、2、3、4,叢集中Broker的唯一id
zookeeper.connect=192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181
  • 安裝slf4j,將slf4j解壓到/usr/local/目錄下
  • slf4j中的slf4j-nop-*.jar複製到kafkalib目錄下面

搭建kafka叢集

  • 按照上述步驟在spark2和spark3分別安裝kafka。用scp把kafka拷貝到spark2和spark3行即可。

啟動kafka叢集

  • 在三臺機器上分別執行以下命令:
nohup bin/kafka-server-start.sh config/server.properties &
  • 解決kafka Unrecognized VM option 'UseCompressedOops'問題
vi bin/kafka-run-class.sh 
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
fi
# 去掉-XX:+UseCompressedOops即可
  • 使用jps檢查啟動是否成功

測試kafka叢集

  • 使用基本命令檢查kafka是否搭建成功
# 在spark1上建立一個TestTopic
bin/kafka-topics.sh –-zookeeper 192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create

# 在spark1上建立一個TestTopic的生產者
bin/kafka-console-producer.sh --broker-list 192.168.75.111:9092,192.168.75.112:9092,192.168.75.113:9092 --topic TestTopic

# 開啟spark1的另一個session,建立一個TestTopic的消費者
bin/kafka-console-consumer.sh --zookeeper 192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181 --topic TestTopic --from-beginning

# 然後在生產者出輸入
hello world

# 相應的在消費者處也會產生
hello world