5 kafka叢集搭建
阿新 • • 發佈:2019-01-05
kafka叢集搭建
安裝scala
- 下載scala
- 將下載的
scala
包解壓縮到/usr/local
資料夾下 - 修改
scala
資料夾名字為scala
- 配置環境變數
- 按照上述步驟在
spark2
和spark3
機器上都安裝好scala
。使用scp
將scala
和.bashrc
拷貝到spark2
和spark3
上即可。
安裝kafka
- 下載kafka
- 將下載的
kafka
包解壓縮到/usr/local
資料夾下 - 修改
kafka
資料夾名字為kafka
- 配置
kafka
vi /usr/local/kafka/config/server.properties broker.id:# 依次增長的整數,0、1、2、3、4,叢集中Broker的唯一id zookeeper.connect=192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181
- 安裝
slf4j
,將slf4j
解壓到/usr/local/
目錄下 - 把
slf4j
中的slf4j-nop-*.jar
複製到kafka
的lib
目錄下面
搭建kafka叢集
- 按照上述步驟在spark2和spark3分別安裝kafka。用scp把kafka拷貝到spark2和spark3行即可。
啟動kafka叢集
- 在三臺機器上分別執行以下命令:
nohup bin/kafka-server-start.sh config/server.properties &
- 解決
kafka Unrecognized VM option 'UseCompressedOops'
問題
vi bin/kafka-run-class.sh if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" fi # 去掉-XX:+UseCompressedOops即可
- 使用jps檢查啟動是否成功
測試kafka叢集
- 使用基本命令檢查
kafka
是否搭建成功
# 在spark1上建立一個TestTopic bin/kafka-topics.sh –-zookeeper 192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create # 在spark1上建立一個TestTopic的生產者 bin/kafka-console-producer.sh --broker-list 192.168.75.111:9092,192.168.75.112:9092,192.168.75.113:9092 --topic TestTopic # 開啟spark1的另一個session,建立一個TestTopic的消費者 bin/kafka-console-consumer.sh --zookeeper 192.168.75.111:2181,192.168.75.112:2181,192.168.75.113:2181 --topic TestTopic --from-beginning # 然後在生產者出輸入 hello world # 相應的在消費者處也會產生 hello world