史上最簡單的kafka實戰教程
你在寫java 版的 kafka程式可能會遇到如下問題
問題一:程式丟擲了org.apache.kafka.common.errors.TimeoutException:
在application.yml 中加入下面這句話 :logging.level.root:debug
然後再看報錯,可以發現下面這個錯誤
錯誤:java.io.IOException: Can't resolve address: ubuntu:9092
原來是無法解析ubuntu.
解決辦法:到C:\Windows\System32\drivers\etc ,用nopaid++開啟hosts檔案 加入下面這句話 遠端主機的ip ubuntu
如 192.168.23.139 ubuntu
好了開始正題了:
一,你得有個虛擬機器,裡面跑著ubuntu
二,環境安裝,1.裝jdk, jdk1.8下載地址 2.環境變數配置
2、解壓原始碼包
通過終端在/usr/local目錄下新建java資料夾,命令列:
2、解壓原始碼包
通過終端在/usr/local目錄下新建java資料夾,命令列:
sudo mkdir /usr/local/java
然後將下載到壓縮包拷貝到java資料夾中,命令列:
進入jdk原始碼包所在目錄
cp jdk-8u25-linux-x64.tar.gz /usr/local/java
然後進入java目錄,命令列:
cd /usr/local/java
解壓壓縮包,命令列:
sudo tar xvf jdk-8u25-linux-x64.tar.gz
然後可以把壓縮包刪除,命令列:
sudo rm jdk-8u25-linux-x64.tar.gz
3、設定jdk環境變數
這裡採用全域性設定方法,它是是所有使用者的共用的環境變數
$sudo gedit ~/.bashrc
開啟之後在末尾新增
export JAVA_HOME=/usr/local/java/jdk1.8.0_25
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
請記住,在上述新增過程中,等號兩側不要加入空格,不然會出現“不是有效的識別符號”,因為source /etc/profile 時不能識別多餘到空格,會理解為是路徑一部分。
然後儲存。
4、檢驗是否安裝成功
在終端輸入如下命令
java -version
下載kafka下載地址 ,解壓檔案
tar zxvf 下載得檔案位置 -C 要解壓得位置 如/Download
1.開啟kafka內建得zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2.開啟kafka服務
bin/kafka-server-start.sh config/server.properties
3.建立一個topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic hello
4.檢視我們建立得topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.建立一個消費者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
6建立一個生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
5.檢視我們的ip
ifconfig
我的ip是
192.168.23.139
一,建立一個springboot專案
二,修改application.properties 為application.yml
開啟pom.xml 檔案新增下面依賴
我的springboot parent 如下
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>這個是要新增的spring-kafka依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.7.RELEASE</version> </dependency>
日誌檔案依賴
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
開啟application.yml檔案
spring: kafka: producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 192.168.23.139:9092 consumer: bootstrap-servers: 192.168.23.139:9092 group-id: foo auto-offset-reset: earliest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #logging.level.root: debug
建立一個消費者
@Component @Slf4j public class KafkaConsumer { /** * 監聽test主題,有訊息就讀取 * * @param message */ @KafkaListener(topics = {"hello"}) public void consumer(String message) { log.info("c1 消費了hello topic messge:{}", message); } }
再建立一個生產者
@Component @Slf4j public class KafkaSender { @Autowired private KafkaTemplate kafkaTemplate; /** * 傳送訊息到kafka,主題為test */ public void sendTest(){ kafkaTemplate.send("hello","hello,kafka " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"))); } }
application 主程式
@SpringBootApplication @EnableScheduling public class KafkaApplication { @Autowired KafkaSender kafkaSender; @Autowired KafkaConsumer kafkaConsumer; public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } @Scheduled(cron = "0/2 * * * * ? ") public void sendMsg(){ kafkaSender.sendTest(); } }