1. 程式人生 > >Windows server 2008 部署Kafka Zookeeper小結

Windows server 2008 部署Kafka Zookeeper小結

1.首先先下載Zookeeper : Zookeeper下載連結

2.下載Kafka :Kafka下載連結 ,注意:程式碼中使用Producer的話,要注意jar包要和安裝的ban版本一致

3.安裝Kafka :

解壓Kafka,注意安裝目錄不能有空格,建一個log資料夾給kafka作為日誌儲存路徑,假設這裡是:C:\\administrator\\software\\kafka\\log

開啟Kafka的安裝目錄下的config/server.properites,加入:log.dirs=C:\\sunrun\\software\\kafka\\log

如圖:

然後修改C:\\sunrun\\software\\kafka_2.12-2.0.0\\bin\\windows的kafka-run-class.bat

直接複製吧(主要是%JAVA_HOME%兩邊加上"):

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

setlocal enabledelayedexpansion

IF [%1] EQU [] (
	echo USAGE: %0 classname [opts]
	EXIT /B 1
)

rem Using pushd popd to set BASE_DIR to the absolute path
pushd %~dp0..\..
set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
  set SCALA_VERSION=2.11.12
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
  for /f "tokens=1,2 delims=." %%a in ("%SCALA_VERSION%") do (
    set FIRST=%%a
    set SECOND=%%b
    if ["!SECOND!"] EQU [""] (
      set SCALA_BINARY_VERSION=!FIRST!
    ) else (
      set SCALA_BINARY_VERSION=!FIRST!.!SECOND!
    )
  )
)

rem Classpath addition for kafka-core dependencies
for %%i in ("%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka-examples
for %%i in ("%BASE_DIR%\examples\build\libs\kafka-examples*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka-clients
for %%i in ("%BASE_DIR%\clients\build\libs\kafka-clients*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka-streams
for %%i in ("%BASE_DIR%\streams\build\libs\kafka-streams*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka-streams-examples
for %%i in ("%BASE_DIR%\streams\examples\build\libs\kafka-streams-examples*.jar") do (
	call :concat "%%i"
)

for %%i in ("%BASE_DIR%\streams\build\dependant-libs-%SCALA_VERSION%\rocksdb*.jar") do (
	call :concat "%%i"
)

rem Classpath addition for kafka tools
for %%i in ("%BASE_DIR%\tools\build\libs\kafka-tools*.jar") do (
	call :concat "%%i"
)

for %%i in ("%BASE_DIR%\tools\build\dependant-libs-%SCALA_VERSION%\*.jar") do (
	call :concat "%%i"
)

for %%p in (api runtime file json tools) do (
	for %%i in ("%BASE_DIR%\connect\%%p\build\libs\connect-%%p*.jar") do (
		call :concat "%%i"
	)
	if exist "%BASE_DIR%\connect\%%p\build\dependant-libs\*" (
		call :concat "%BASE_DIR%\connect\%%p\build\dependant-libs\*"
	)
)

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do (
	call :concat "%%i"
)

rem Classpath addition for core
for %%i in ("%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar") do (
	call :concat "%%i"
)

rem JMX settings
IF ["%KAFKA_JMX_OPTS%"] EQU [""] (
	set KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false
)

rem JMX port to use
IF ["%JMX_PORT%"] NEQ [""] (
	set KAFKA_JMX_OPTS=%KAFKA_JMX_OPTS% -Dcom.sun.management.jmxremote.port=%JMX_PORT%
)

rem Log directory to use
IF ["%LOG_DIR%"] EQU [""] (
    set LOG_DIR="%BASE_DIR~%/logs"
)

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
) ELSE (
  rem create logs directory
  IF not exist "%LOG_DIR%" (
      mkdir "%LOG_DIR%"
  )
)

set KAFKA_LOG4J_OPTS=-Dkafka.logs.dir="%LOG_DIR%" "%KAFKA_LOG4J_OPTS%"

rem Generic jvm settings you want to add
IF ["%KAFKA_OPTS%"] EQU [""] (
	set KAFKA_OPTS=
)

set DEFAULT_JAVA_DEBUG_PORT=5005
set DEFAULT_DEBUG_SUSPEND_FLAG=n
rem Set Debug options if enabled
IF ["%KAFKA_DEBUG%"] NEQ [""] (


	IF ["%JAVA_DEBUG_PORT%"] EQU [""] (
		set JAVA_DEBUG_PORT=%DEFAULT_JAVA_DEBUG_PORT%
	)

	IF ["%DEBUG_SUSPEND_FLAG%"] EQU [""] (
		set DEBUG_SUSPEND_FLAG=%DEFAULT_DEBUG_SUSPEND_FLAG%
	)
	set DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=!DEBUG_SUSPEND_FLAG!,address=!JAVA_DEBUG_PORT!

	IF ["%JAVA_DEBUG_OPTS%"] EQU [""] (
		set JAVA_DEBUG_OPTS=!DEFAULT_JAVA_DEBUG_OPTS!
	)

	echo Enabling Java debug options: !JAVA_DEBUG_OPTS!
	set KAFKA_OPTS=!JAVA_DEBUG_OPTS! !KAFKA_OPTS!
)

rem Which java to use
IF ["%JAVA_HOME%"] EQU [""] (
	set JAVA=java
) ELSE (
	set JAVA="%JAVA_HOME%/bin/java"
)

rem Memory options
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
	set KAFKA_HEAP_OPTS=-Xmx256M
)

rem JVM performance options
IF ["%KAFKA_JVM_PERFORMANCE_OPTS%"] EQU [""] (
	set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
)

IF not defined CLASSPATH (
	echo Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
	EXIT /B 2
)

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
rem echo.
rem echo %COMMAND%
rem echo.
%COMMAND%

goto :eof
:concat
IF not defined CLASSPATH (
  set CLASSPATH="%~1"
) ELSE (
  set CLASSPATH=%CLASSPATH%;"%~1"
)

4.安裝Zookeeper :

複製 zoo_sample.cfg 副本,修改副本名為zoo.cfg,新建data、log資料夾儲存日誌和資料,然後修改Zookeeperanzh安裝目錄下的conf\zoo.cfg,如圖:

dataDir=C:\\administrator\\software\\zookeeper-3.4.12\\data

dataLogDir=C:\\administrator\\software\\zookeeper-3.4.12\\log

5.好了,配置一般這樣就正常了,去到kafka安裝的根目錄,按住Shift鍵然後右鍵,選擇在此處開啟powershell也就是cmd啦,當然也可以cd過去。

啟動kafka,輸入:

.\bin\windows\kafka-server-start.bat .\config\server.properties

同樣的去到zookeeperanzh安裝的bin目錄,開啟cmd然後輸入:

zkServer.cmd

這樣就OK了。

6.遇到的問題總結:

(1)啟動kafka顯示系統找不到指定路徑,好吧,jdk沒設定好環境變數,參考配置吧:Jdk環境配置

(2)zookeeper報錯: Error:KeeperErrorCode = NoNode for /otter,那麼關閉ZooKeeper,刪除你自己設定的dataDir路徑下的version-2資料夾,然後重啟;

(3)kafka報錯:WARN org.apache.kafka.clients.NetworkClient - Bootstrap broker 127.0.0.1:9092 disconnected,專案中的jar和安裝的版本不一致,建議重灌對應版本的kafka吧;

(4)zookeeper報錯:WARN org.apache.kafka.clients.NetworkClient - Bootstrap broker 127.0.0.1:9092 disconnected,同理(2)的做法;

(5)zookeeper報錯:Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate,先檢查專案的jar和安裝的shif是否版本一致,一樣的話,開啟C:\Windows\System32\drivers\etc的host最後面加入:127.0.0.1  localhost,因為有的新裝的伺服器127.0.0.1不是對應的localhost的;

(6)kafka報錯:錯誤: 找不到或無法載入主類,那就修改bin\windows\kafka-run-class.bat吧;

(7) zookeeper報錯:系統找不到指定路徑,修改下bin目錄下的zkEnv.cmd加上"。

 

附kafka常用命令(安裝根目錄kafka\bin\windows下shift+右鍵開啟cmd執行):


1、建立Topic:kafka-topics --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic testTopic

2、檢視topic:kafka-topics --list --zookeeper localhost:2181 

3、 啟動生產者:kafka-console-producer --broker-list localhost:9092 --topic cango

4、 啟動消費者:kafka-console-consumer --zookeeper  localhost:2181 -topic testTopic --from-beginning

5、修改分割槽數量:kafka-topics --alter --zookeeper localhost:2181  --partitions 3 --topic testTopic

6、檢視topic描述:kafka-topics --zookeeper localhost:2181  --describe --topic testTopic